Single Producer Single Consumer nu heb ik meerdere consumenten nodig

Ik heb een Situatie waarin ik een eenvoudig Producer Consumer-model heb geschreven om in brokken data van Bluetooth te lezen en vervolgens elke 10k bytes die ik naar bestand heb geschreven. Ik gebruikte een standaard P-C-model met een Vector als mijn berichthouder. Dus hoe kan ik dit veranderen zodat meerdere Thread-consumenten dezelfde berichten kunnen lezen, ik denk dat de term Multicaster zou zijn? Ik gebruik dit eigenlijk op een Android-telefoon, dus JMS is waarschijnlijk geen optie.

static final int MAXQUEUE = 50000; 
private Vector messages = new Vector(); 

/**
 * Put the message in the queue for the Consumer Thread
 */
private synchronized void putMessage(byte[] send) throws InterruptedException { 

    while ( messages.size() == MAXQUEUE ) 
        wait(); 
    messages.addElement( send ); 
    notify(); 
} 


/**
 * This method is called by the consumer to see if any messages in the queue
 */
public synchronized byte[] getMessage()throws InterruptedException { 
    notify(); 
    while ( messages.size() == 0 && !Thread.interrupted()) {
        wait(1); 
    }
    byte[] message = messages.firstElement(); 
    messages.removeElement( message ); 
    return message; 
} 

Ik verwijs naar de code van een Oreilly-boek Message Parser sectie

1

3 antwoord

Pub-sub mechanisme is absoluut de manier om te bereiken wat je wilt. Ik weet niet zeker waarom het ontwikkelen voor Android je zal belemmeren om JMS te gebruiken, wat net zo eenvoudig is als een specificatie. Uitchecken this thread on SO.

1
toegevoegd
Ja, maar dat spreekt vooral over het gebruik van JMS, wat een soort overboord is voor wat ik nodig heb in een telefoonapp die berichten verzendt tussen threads.
toegevoegd de auteur JPM, de bron
Zal in dat geval het eenvoudig inpakken van uw Vector (of een wachtrij zoals Hanno Binder suggereerde) in een Observer/Waarneembaar patroon doen?
toegevoegd de auteur mazaneicha, de bron

Je moet zeker een wachtrij gebruiken in plaats van de Vector
Geef elke thread een eigen wachtrij en, wanneer een nieuw bericht wordt ontvangen, voeg voeg() het nieuwe bericht toe aan de wachtrij van elke thread. Voor flexibiliteit kan een listenerpatroon ook nuttig zijn.

Bewerk:

Ok, ik vind dat ik ook een voorbeeld moet toevoegen:
(Klassiek waarnemerspatroon)

Dit is de interface die alle consumenten moeten implementeren:

public interface MessageListener {
  public void newMessage( byte[] message );
}

Een producent kan er als volgt uitzien:

public class Producer {
  Collection listeners = new ArrayList();


 //Allow interested parties to register for new messages
  public void addListener( MessageListener listener ) {
    this.listeners.add( listener );
  }

  public void removeListener( Object listener ) {
    this.listeners.remove( listener );
  }

  protected void produceMessages() {
    byte[] msg = new byte[10];

   //Create message and put into msg

   //Tell all registered listeners about the new message:
    for ( MessageListener l : this.listeners ) {
      l.newMessage( msg );
    }

  }
}

En een consumentenklasse zou kunnen zijn (met behulp van een blokkeerwachtrij die dat alles doet wait() ing en notify() ing voor ons):

public class Consumer implements MessageListener {

  BlockingQueue< byte[] > queue = new LinkedBlockingQueue< byte[] >();

 //This implements the MessageListener interface:
  @Override
  public void newMessage( byte[] message ) {
    try {
      queue.put( message );
    } catch (InterruptedException e) {
       //won't happen.
    }
  }

   //Execute in another thread:       
    protected void handleMessages() throws InterruptedException {
        while ( true ) {
            byte[] newMessage = queue.take();

           //handle the new message.
        }
    }


}
0
toegevoegd
Zowel Vector als wachtrij kunnen alleen objecten bevatten, geen primtives. Gebruik blokkeringswachtrijen en uw volledige code hierboven wordt zo eenvoudig als wachtrij.put (bericht) en wachtrij.teken ().
toegevoegd de auteur JimmyB, de bron
Ik ben meer geneigd naar uw voorbeeld toe te kijken, omdat ik de SettableFuture-machine niet kan laten werken.
toegevoegd de auteur JPM, de bron
Ik heb altijd zoiets behandeld met Luisteraars, zelfs een volledig GUI-statusluisterprogramma geschreven voor het opnemen van realtime gegevens uit een JMS-systeem en het bijwerken van aangepaste componenten met deze statuswijzigingen.
toegevoegd de auteur JPM, de bron

Dit is wat ik bedacht als een voorbeeld bij het doorzoeken van een code en het wijzigen van enkele bestaande voorbeelden.

package test.messaging;

import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;

public class TestProducerConsumers {

    static Broker broker;

    public TestProducerConsumers(int maxSize) {
        broker = new Broker(maxSize);
        Producer p = new Producer();
        Consumer c1 = new Consumer("One");
        broker.consumers.add(c1);
        c1.start();

        Consumer c2 = new Consumer("Two");
        broker.consumers.add(c2);
        c2.start();

        p.start();
    }

   //Test Producer, use your own message producer on a thread to call up
   //broker.insert() possibly passing it the message instead.
    class Producer extends Thread {

        @Override
        public void run() {
            while (true) {
                try {
                    broker.insert();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread {
        String myName;
        LinkedBlockingQueue queue;

        Consumer(String m) {
            this.myName = m;
            queue = new LinkedBlockingQueue();
        }

        @Override
        public void run() {
            while(!Thread.interrupted()) {
                try {
                    while (queue.size() == 0 && !Thread.interrupted()) {
                        ;
                    }
                    while (queue.peek() == null && !Thread.interrupted()) {
                        ;
                    }
                    System.out.println("" + myName + " Consumer: " + queue.poll());
                } catch (Exception e) { }
            }
        }
    }

    class Broker {
        public ArrayList consumers = new ArrayList();

        int n;
        int maxSize;

        public Broker(int maxSize) {
            n = 0;
            this.maxSize = maxSize;
        }

        synchronized void insert() throws InterruptedException {
                   //only here for testing don't want it to runaway and 
                    //memory leak, only testing first 100 samples.
            if (n == maxSize)
                wait();
            System.out.println("Producer: " + n++);
            for (Consumer c : consumers) {
                c.queue.add("Message " + n);
            }
        }

    }

    public static void main(String[] args) {
        TestProducerConsumers pc = new TestProducerConsumers(100);

    }
}
0
toegevoegd
Gebruik die while (...) loops NIET in de consument voor druk wachten! Dat is echt slechte codering. Een blokkeerwachtrij zal dit allemaal voor u afhandelen, inclusief de wait() ing in insert() . (Waar is het corresponderende notify() gesprek eigenlijk?)
toegevoegd de auteur JimmyB, de bron
Dus wat zit daar dan aan? If (n == maxSize) wait (); "?
toegevoegd de auteur JimmyB, de bron
U hoeft niet op de inserts te melden omdat de consumenten geen gebruik maken van wait() De 'while's' zijn er om te testen. Ik gooide wat slaap in plaats van wacht bij de consument omdat mijn berichten niet nodig zijn om in realtime te zijn.
toegevoegd de auteur JPM, de bron
Opnieuw testen wilde slechts 100 berichten
toegevoegd de auteur JPM, de bron