Yazan : Şadi Evren ŞEKER

Bu yazının amacı, başta işletim sistemleri olmak üzere, bilgisayar bilimlerinin pek çok alanında geçen ve klasik bir koşutzamanlı (concurrent) problem örneği olan üretici / tüketici (producer / consumer ) örneğini açıklamaktır.

Problemin çeşitli değiştirilmiş tanımları olmasına karşılık, tanımı gayet basittir. Aynı anda çalışan iki iş (process) bulunmaktadır ve bunlardan birisi üretmekte diğeri de bu üretilen ürünü tüketmektedir.

Buradaki sorun, üretilen ürün henüz tüketilmeden ikincisinin üretilmemesi veya henüz ürün üretilmeden tüketilmemesidir.

Problemin biraz değiştirilmiş halinde sınırlı bir bant üzerinde üretim yapılmakta, dolayısıyla üretim işi (producer process) henüz tüketim yapılmadan birden fazla ürün üretebilmektedir. Bu probleme sınırlı bant (bounded buffer) ismi verilmektedir ancak burada da bir sınır bulunmaktadır. Örneğin bant sınırı 10 ise, 10 ürün üretildikten sonra bir tane ürün tüketilmeden yeni ürün üretilememektedir.

Problemin bir diğer hali de birden fazla üretici ve tüketici olması durumudur. Örneğin aynı bant üzerine aynı anda 3 ayrı üretici ürettikleri ürünleri koyabilmekteler veya aynı anda 5 ayrı tüketici üretilen bu ürünleri tüketebilmektedirler. Elbette bu yeni halinde, aynı ürünü iki tüketicinin tüketMEmesi veya aynı bant aralığına aynı anda iki üreticinin üretim yapmaması gerekir.

Problem tanımı itibariyle, işlemler arası iletişimi gerektirmektedir (interprocess communication, IPC) ve iki işlem birbiri ile iletişime geçerek senkronize olmalıdır. Yani tüketici, ne zaman tüketeceğine, üreticinin hareketlerine bakarak karar vermekte, benzer şekilde üretici de ne zaman üreteceğine, tüketicinin hareketlerine bakarak karar vermektedir.

Yarış Durumu (Race Condition)

Problemin çözümü için bilinmesi gereken en önemli durum yarış durumudur. Buna göre kodlama sırasında iki işlem (process) aynı anda çalışacağına göre, hangisinin yazılan hangi satırı önce yapacağı bilinemez.

Paylaşılmış Değişken (Shared Variable)

İki işlemin aynı anda erişebildiği, okuyabildiği veya içeriğini değiştirebildiği değişkenlere verilen isimdir. Herhangi bir cinsten olabilir (örneğin int) ve anlık olarak bir işlem erişebilir. Ancak, değişkene erişim, işlemler arası senkronizasyonla kontrol edilmezse anlık olarak kimin eriştiğinin bilinmesi imkansızdır. Kısaca, kim önce gelirse erişir.

Örnek Çözüm

Aşağıdaki şekilde bir kodlama yapmaya çalışalım (bu kod tam çözüm değildir):

Kodlamak istediğimiz durum, yukarıdaki anlatılan problem çeşitlerinden ilkidir. Yani üretici tek bir ürün üretir ve tüketici de bu ürünü tüketir.

int sayac = 0;
procedure producer() {
    while (true) {
        urun = uret();

        if (sayac == LIMIT) {
            sleep();
        }

        urunuSun(urun);
        sayac = sayac + 1;

        if (sayac == 1) {
            wakeup(consumer);
        }
    }
}

procedure consumer() {
    while (true) {

        if (sayac == 0) {
            sleep();
        }

        urun = urunuAl();
        sayac = sayac - 1;

        if (sayac == LIMIT - 1) {
            wakeup(producer);
        }

        tuket(urun);
    }
}

Yukarıdaki kodda, yapılmak istenen iki ayrı işlemin de çalışmaları sırasında kullanacakları iki fonksiyonu kodlamaktır. Buna göre üretici işlemi producer() fonksiyonunu çalıştırırken, tüketici işlemi consumer() fonksiyonunu çalıştıracaktır.

Dikkat edileceği üzere iki işlem de sonsuz döngüdedir, yani while(true) döngüsü altında sonsuza kadar dönmektedir, amacımız sonsuza kadar üreticinin ürünü üretip sunması, tüketicinin de tüketmesidir.

Basitçe bir üretici, sayaç isimli paylaşılmış değişkeni (shared variable) kullanarak tüketici ile iletişime geçer. Şayet sayaç değeri limite ulaştıysa, daha fazla üretim yapılması mümkün değil demektir ve bu durumda üretici beklemelidir. Şayet sayaç değeri 0’a ulaştıysa, bu durumda da daha fazla tüketim yapılması mümkün değildir ve tüketici beklemelidir.

Bu çözümde ölümcül kilitlenme olma ihtimali bulunduğu için tam çözüm olarak kabul edilmesi mümkün değildir.

Örneğin aşağıdaki senaryoyu ele alalım:

  1. Tüketici tarafından sayaç değişkeni okundu ve değeri 0 olduğu için bekleme yapılan if bloğuna girildi.

  2. Tüketici if bloğuna girdiği anda (ancak henüz uyumadan yani sleep() fonksiyonu çağrılmadan) üretici tarafı da çalıştı ve bir ürün üreterek banta koydu.

  3. Tüketicinin ürün üretmesi sırasında sayaç değeri de arttırıldı

  4. Ayrıca tüketici, kendi sırası bitip sıranın üreticiye geçmesinden dolayı, üreticiyi de uyandırdı (üretici henüz uyumamıştı dolayısıyla boşuna yapılan bir uyandırma)

  5. Ardından tüketici beklemekte olduğu sleep() fonksiyonuna girdi ve üretici de sayaç değeri 1 olduğu için uyuma durumuna geçti

Sonuç olarak iki işlem de uyumaktadır. Bu durumda bir ebedi kilitlenme (deadlock) oluşacak ve işlemler asla çalışmayacaktır.

Semaphore Kullanarak Çözüm:

Yukarıdaki çözümde kullanılan paylaşılmış değişken yaklaşımını bu sefer, işletim sistemlerindeki bir senkronizasyon yöntemi olan semaforları kullanarak çözmeye çalışalım.

Temel olarak bir semafor (ilgili yazıyı da okuyabilirsiniz) iki işlem arasında bir trafik ışığı benzeri şekilde davranarak anlık olarak bir tanesinin çalışmasını sağlar.

Semaforların üzerinde çalıştığı değişkenler bulunur ve bu değişkenlere erişim anlık olarak semafor yapısı tarafından kontrol altına alınmıştır. Semaforların iki temel fonksiyonu wait (bekle) ve signal (geç) şeklindedir. Trafik ışıklarının kırmız (wait, bekle) ve yeşil (geç, signal) olması şeklinde düşünülebilir.

Yazının başında anlatılan 3 tip problemden, üçüncüsünün, yani birden fazla üretici ve tüketici olması durumunun, semafor olarak çözümü aşağıda verilmiştir (tam çözüm değildir):

semaphore doluluk = 0;
semaphore bosluk = LIMIT; 

procedure producer() {
    while (true) {
        item = produceItem();
        bekle(bosluk);
            urunuKoy(item);
        gec(doluluk);
    }
}

procedure consumer() {
    while (true) {
        bekle(doluluk);
            urun = urunuAl();
        gec(bosluk);
        tuket(urun);
    }
}

Yukarıdaki çözümde, birden fazla işlem bir öncekine benzer şekilde aynı anda çalışmakta ve bu sefer doluluk ve boşluk isimleri verilen iki farklı semafor kullanılmaktadır. Basitçe, doluluk semaforu, üreticiyi bekletmekte, boşluk semaforu da tüketiciyi bekletmektedir (hat boşken tüketilemediğini ve hat doluyken üretilemediğini hatırlayınız).

Yukarıdaki kodda, daha sonra anlatılacak olan eşbağımsızlık (mutually exclusive) problemi bulunmaktadır. Bu problemi aşağıdaki adımların gerçekleştiği senaryoda görebiliriz:

  1. İki üretici aynı anda boşluk değişkenini azaltır (bekle fonksiyonunu çağırır)

  2. Üreticilerden sadece birisi üretim hattındaki boşluğu bulup ürünü koyar.

  3. İkinci üretici, ürettiği ürünü, üretim hattına yerleştirmek istediğinde, birinci üretici tarafından zaten yerleştirilmiş olduğu için problem oluşur. Bu aşamada kodlanan dile göre, mevcut ürünün üzerine yazılır şeklinde düşünebiliriz.

Netice olarak iki üretici de aynı alana ürün koymu ve neticede iki üretici çalışmış ve sanki iki ürün üretilmiş gibi semafor değeri değiştirilmiş buna karşılık sadece bir ürün üretilmiştir. Bu durum tüketim sırasında sorunlara yol açacaktır.

Bu problemin çözümü için aynı cinsten, birden fazla işlem (process) olması durumlarının kontrol edilmesi gerekir. Bu kontrole eşbağımsızlık (mutual exclusion) ismi verilir ve aşağıdaki şekilde çözülebilir:

semaphore mutex = 1;
semaphore doluluk = 0; // items produced
semaphore bosluk = LIMIT; // remaining space

procedure producer() {
    while (true) {
        item = produceItem();
        bekle(bosluk);
          bekle(mutex);
            urunuKoy(item);
          gec(mutex);
        gec(doluluk);
    }
}

procedure consumer() {
    while (true) {
        bekle(doluluk);
          bekle(mutex)
            urun = urunuAl();
          gec(mutex);
        gec(bosluk);
        tuket(urun);
    }
}

Yukarıdaki yeni çözümde, işlemler arasında (üretici <-> tüketici) eşitleme olduğu kadar, aynı cinsten işlemler arasında da eşitleme yapılmışıtr.

mutex: Bu semafor, anlık olarak tek işlemin çalışmasını sağlar. Peki zaten tek işlem çalışıyorsa ilave semafora ne gerek var? Diyebilirsiniz ancak tek işlem çalışması, ürün yokken tüketilmesini veya hat doluyken üretilmesini engellemez.

Kısacası yukarıdaki yeni çözüm, bir öncekindeki üretici / tüketici eş zamanlamasına ilave olarak aynı anda birden fazla üretici ve tüketicinin de çalışmasını engellemektedir.

 

Yorumlar

  1. viensdans

    selam
    aşagıdaki problem için bir cözüm önerisine ihtiyacim var. yani programı yazarken nasıl bir yol izlemem gerekiyor . thread-safe derken ne kastediliyor. eş zamanli olarak race probleminden nasıl kaçınılacak,ürünlerin değeri ratgele bir değerde olmakla ne kastediliyor

    phyton da üretici -tüketici thread-safe uygulaması yapılacak. iplikler “üretici” ve “tüketici” cüzdana benzer bir fonksiyonla Wallet denen ortak bir kaynakla calısacak.
    iplik “üretici” ürünleri “Wallet”e ekleyecek ve iplik “tüketici” ürünleri “Wallet”den tüketecek.
    tüketilecek yeterli ürün yok ise , “tüketici” kaynağın tekrar dolmasını bekleyecek.
    kaynaktaki yapılan calışma “Wallet” eş zamanlı olacak ve race probleminden kacınılacak.
    eklenen ve tüketilen ürünlerin miktari 0-100 arasi rastgele bir değerde olacak. yani cesitli iplikler calisirken her dönüste üreten, rastgele seçilen bir değer kullanacak ve tüketen baska birini.
    iki ipliğin çalışmasındaki geçerli sonuç komut satırında text olarak görüntülenecek.
    Teşekkürler

  2. viensdans

    yada su kodlarla hangi yöntemler uygulanmis diye sorabilirim.

    import time
    from threading import Thread, Lock
    
    def main():
        global lock		
        global wallet  	
        lock = Lock()
        wallet = 0		
    
        thread1 = Producer()
        thread1.start()
        
        thread2 = Consumer()
        thread2.start()
    
    
    class Producer(Thread):
        def run(self):
    	    global wallet	
    	    lock.acquire()
                for i in range(0, 100):
    		wallet = wallet + i
                    print 'Add ' , i, "to wallet that now equals ", wallet
                    time.sleep(1)
    	    lock.release()
    
    
    class Consumer(Thread):
        def run(self):
    	global wallet		
    #	lock.acquire()
    	for i in range(0, 100):
    		wallet = wallet - i
                    print 'Subtract ' , i, "from wallet that now equals ", wallet
                    time.sleep(.25)
    #	lock.release()
    
    
    if __name__ == '__main__':
        main()
    
  3. Anonim

    Bana çok yardımcı oldu. Saatlerdir okuyorum, ancak yeni anladım Semaphore ve Mutex kavramlarını. Katkınız için çok teşekkürler.

  4. viensdans

    senkronizasyon cözumleri icin bir suru secenekler var
    disable itterrupts, dijktra’s semafor, test and set instruction, monitor,simple lock..
    ben nasil karar verecegim hangi senronizasyon metodu kullanacagima, bunlarin arasinda ne fark varki.

  5. viensdans

    yada producer consumer problemi icin hangi senkronizasyon metodlari uyulanabilir. sadece lock mu rlock, semefor, bounded semafor , event , condition metodlarida uyugulanbilir mi bu problem icin , uygulnamiyorsa neden uygulanmaz.

  6. Şadi Evren ŞEKER Article Author

    Bahsettiğiniz yöntemlerin bir kısmı diğerleri ile karşılaştırılamayacak yöntemler ama kısaca açıklamaya çalışayım. Önce basitten başlayalım.

    Birincisi monitorler sadece nesne yönelimli oramda kullanılır. Yani java C# gibi object oriented bir dil kullanmıyorsanız monitor ile işiniz olmaz.

    semaphore, conditional variable ve lock kavramları birbirine alternatif kullanılabilir. Genelde CV (conditional varible) threadler için uygundur çünkü bunlarda paylaşılmış değişken (shared variable) vardır. Ancak işlemler (process) için de kullanılabilir ama bu durumda IPC (inter process communication) yanı işlemler arası iletişim yapmanız gerekir.

    Ama semaphore kullanıyorsanız zaten semaphore yapı olarak bu mesajlaşmayı da içerir yani ilave bir mesaj geçirme ile uğraşmazsınız.

    test and set ise tamamen başka bir olay. Şöyle anlatayım. Örneğin bir değişkenin değerini arttıracaksınız (diyelim ki integer semaphore kullanıyorsunuz ve semaphore değerini arttıracaksınız) bu durumda mevcut değerini okumanız sonra arttırmanız gerekir. Ve bu iki işlem atomic değildir. Yani siz değeri okudunuz ve henüz arttırmadan başka birisi okuyup arttırabilir (diyelim ki siz 10 okudunuz tam 11 yapmadan başka bir işlem gelip 10 okudu ve 11 yaptı, siz de üzerine 11 yapacaksınız ve aslında değişken iki kere arttırlıp 12 olacakken 11 olmuş olur)

    İşte test and set böyle durumlarda kullanılan ve donanım tarafından (cpu) desteklenen bir instructiondir.

    Sorunuzdan anladığım kadarıyla kavramları oturtmakta biraz zorluk yaşıyorsunuz, her konuyu ayrı ayrı çalıştıktan sonra karşılaştırmak daha yerinde olabilir.

    Başarılar

  7. viensdans

    #tuketici yeterli kaynak yok ise nasil bekletilir asagidaki program calistiginda kaynaklar eksi #sayiya dusuyor. queue mi kullanilmasi gerekiyor nasil kodlalayabilirim, parogramci degilim network #management okuyorum bu dersi sinirlerimi ölcmek icin koymus olmalilar herhalde.:)

    #!/usr/bin/python
    # -*- coding: cp1252 -*
    # python27
    import random
    import time
    from threading import Thread, Lock
    
    
    def main():
    	global lock 
    	global wallet
    	
    	
    	
    	lock = Lock()
    	wallet = 0 
    	
    
    	Thread1 = Producer()
    	Thread1.start() 
    
    	Thread2 = Consumer()
    	Thread2.start() 
    
    class Producer(Thread): 
    	
    	def run(self): 
    		global wallet 
    	
    		
                    for i in range(30):
    			lock.acquire()
    			if wallet >= 50:
                                    print "uretici kaynak ekleme sayisi", i, " cuzdan su anki miktar ", wallet, "nn"
    				print " kaynak doldu uretim durdu"
    				lock.release() #kiliti birak tuketiciye kaynaklari tuketmesi icin izin ver
    				time.sleep(0.25) 
    				lock.acquire() #uretim icin tekrar kilitliyor
    				print "producer uretime basladi tekrar"
    			if wallet < 50: # wallet de 50 den az kaynak varsa uretecek
    				
    				i = random.randint(0,100) #kaynak karisik sayida wallete eklenecek
    				wallet = wallet + i
    				print "uretici kaynak ekleme sayisi", i, " cuzdan su anki miktar ", wallet, "nn"
    			
    			
    			lock.release() 
    			time.sleep(.75) 
    	
    			
    class Consumer(Thread):
        def run(self):
    	global wallet		
    
    	lock.acquire()
    	for i in range(5):
                    i = random.randint(0,100)
    		wallet = wallet - i
                    print "cikartilan kaynak sayisi " , i, "cuzdandaki miktar ", wallet
                    time.sleep(1)
    
    	lock.release()
    
    
    if __name__ == "__main__":
        main()
    
  8. viensdans

    Lock ve Queue isleminde basarili olamadigim icin birde condition da sansimi denedim burda da kaynaga wallet = wallet + 1 islemini yapmayi cözemedim. yani o sekilde tek bir sayi yukselip alcaliyordu burdaysa liste ekleyen bi mekanizma var, ilk gelen sayi ilk gidiyor (fifo olmasi gerek) ve ayrica ureticiyi 1 saniye yerine 2 , tuketiciyide 2 saniye yerine 1 saniye uyutunca kaynakta tek bir sayi oluyor ve sadece onu tuketiyor. yani karisik sayi eklemis oluyor ama karisik sayi cikartmamis gibi oluyor sanki. dogru yola girmek icin wallet = wallet +1 islemini gerceklestirmem gerekiyorsa nasil kodlanmasi gerekiyor.?

    
    import random
    import threading
    import time
     
    class Producer(threading.Thread):
        #def __init__(self, wallet, not_empty, not_full):
        def __init__(self, wallet, Condition):
            threading.Thread.__init__(self)
     
            # degisken 
            self.wallet = wallet
       
        def run(self):
            
            for i in range(10):# 10 defa uret islemi yapacak
                # Sleep for 1 second (data uretimini engeleyecek 1 saniye icin)
                time.sleep(1)
               
                Condition.acquire() # dolu degil kilitlen
     
                while len(self.wallet) == 20: # cuzdanin kapasitesi 20 madde aliyor. wallet dolu  bosalana kadar durdur.
                    
                    Condition.wait() #  dolu degil bekle
     
                i = random.randint(0, 100) # cuzdana karsik sayilar ekle
                self.wallet.append(i)
     
                Condition.notify() # kaynak bos degil bekleyen tuketiciyi yeniden baslat 
                
                Condition.release() # kaynak dolu degil,  birak
                
                print 'Eklenen ' , i, "suan cuzdanda bulunanlar ", wallet
     
    class Consumer(threading.Thread):
        
        def __init__(self, wallet, Condition):
            threading.Thread.__init__(self)
     
            self.wallet = wallet # depolama degiskeni
            
        def run(self):
            
            for i in range(10): # tuketici 10 kere calisacak.
                
                Condition.acquire() # kaynak bos degil kilitlen
     
                
                while not self.wallet: # cuzdanda kaynak bulunana kadar bekle
                    
                    Condition.wait() # bos degil bekle
     
                
                i = self.wallet.pop(0) # kaynaktaki ilk maddeyi tuket
     
                
                
                Condition.notify() # kaynak dolu degil. ureticiyi tekrar calistir
                
                Condition.release() # kaynak bos degil birak
     
                
                time.sleep(2) # 2 saniye data tuketim islemini durdur.
                
                print 'tuket ' , i, "cuzdanda kalan sayilar ", wallet
     
    wallet = [] # uretici ve tuketici burada calisacak, python listeleri kullanilacak
    Condition = threading.Condition() # kilit be bekleme kosullari yap
    thread1 = Producer(wallet, Condition)
    thread1.start()
    thread2 = Consumer(wallet, Condition)
    thread2.start()
    
  9. viensdans

    queue kullanildigi zaman uretici ve tuketici probleminde diger senkronizasyon islemlerini yapmaya gerek kalmiyor mu, kritik alan filan kullanmaya gerek yok mu,. queue de bir nevi lock, rlock, semafore.. islevini mi göruyor. lock örnegimde urunler eksi sayilara dusuyor du asagidaki queue de denedigimde urun varsa tuketiliyor. wallet degiskeni q = Queue.Queue() nin yerine mi gecti. yada lock ile queue birlikte mi kullanilmasi gerekiyor. lutfen yardim !!!

    #!/usr/bin/env python 
    # 
    import random
    import time 
    import Queue 
    from threading import Thread
    
    def main(): 
        q = Queue.Queue()        
         
        p = Producer(q) 
        p.daemon = True 
        p.start() 
         
        c = Consumer(q) 
        c.daemon = True 
        c.start() 
         
        while True: 
            time.sleep(.5) 
     
     
    class Producer(Thread): 
        def __init__(self, q): 
            Thread.__init__(self) 
            self.q = q 
             
        def run(self): 
            while True: 
                for i in range(10):
                    i = random.randint(0, 100)
                    self.q.put(i) 
                    print('put %i on queue' % i) 
                    time.sleep(.25) 
     
     
    class Consumer(Thread): 
        def __init__(self, q): 
            Thread.__init__(self) 
            self.q = q 
         
        def run(self): 
            while True: 
                try: 
                    i = self.q.get(False)
                    i = random.randint(0, 100)
                    print('got %i from queue' % i) 
                except Queue.Empty: 
                    time.sleep(.25) 
     
     
    if __name__ == '__main__': 
        main()
    
  10. Şadi Evren ŞEKER Article Author

    Kusura bakmayın yardımcı olamadım, biraz kişisel yoğunluğum biraz da python diline hakim olmamam yüzünden. Kısaca durumu şu şekilde anlatabilirim. Bu tip senkronizasyon problemlerinde, erişimin sırayla yapılması gerekir. Yani aynı veri ünitesine (değişken veya veri yapısı örneğin queue) erişim sırayla olmalı. Şayet sırayla erişmez ve aynı anda erişirlerse birisinin yaptığı değişikliği tamamlamandan diğeri erişerek eski veriyi alabilir. Yani hangi haline erişildiği garanti edilemez.
    Bu durumda, kritik alan bu tip verilere erişilen alanlardır ve sizin bahsettiğiniz gibi kilitlerle (lock) kontrol edilmelidir.

    Vakit bulabilirsem python diline bakıp konu hakkında birşeyler yazmaya çalışırım.

    Başarılar

  11. immortal

    Bir bilgisayar mühendisi adayı olarak benim en büyük kaynağım sizin siteniz.Her konuda aradığımı burda buluyorum.Çok teşekürler bu yazı içinde:)

  12. immortal

    tarif ettiğiniz son çözüm yolunda sadece aynı buffer alanında birden fazla process çalışamaz değilmi?,yani farklı buffer alanlarında birden fazla üretici veya tüketici çaloşabilir?

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir