Pull to refresh

Лабораторные по многопоточности в Java: Parallel Copy

Reading time 4 min
Views 25K
Хорошие лабораторные по многопоточности (простые, понятные, нетривиальные и полезные в народном хозяйстве) — большая редкость. Предлагаю Вам одно условие и четыре лабораторные работы по элементарной многопоточности на Java.

Также я веду курс «Scala for Java Developers» на платформе для онлайн-образования udemy.com (аналог Coursera/EdX).

Условия


Это реализация однопоточного побайтового копировальщика из InputStream в OutputStream. Копирование происходит в потоке вызвавшем метод copy(...)
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class CopyUtil {
    public static void copy(InputStream src, OutputStream dst)throws IOException{
        try (InputStream src0 = src; OutputStream dst0 = dst) {
            int b;
            while ((b = src.read()) != -1) {
                dst.write(b);
            }
        }
    }
}


Это реализация однопоточного копировальщика массивами из InputStream в OutputStream. Копирование происходит в потоке вызвавшем метод copy(...)
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class CopyUtil {
    public static void copy(InputStream src, OutputStream dst)throws IOException{
        byte[] buff = new byte[128];
        try (InputStream src0 = src; OutputStream dst0 = dst) {
            int count;
            while ((count = src.read(buff)) != -1) {
                dst.write(buff, 0, count);
            }
        }
    }
}


Это реализация многопоточного копировальщика массивами из InputStream в OutputStream. Мы заводим на чтение и на запись по отдельному новому потоку и соединяем их блокирующей ограниченной очередью для передачи данных от читателя к писателю
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

public class CopyUtil {
    public static void copy(final InputStream src, final OutputStream dst) throws IOException {
        // reader-to-writer byte[]-channel
        final BlockingQueue<byte[]> buffer = new ArrayBlockingQueue<>(64);
        // exception-channel from reader/writer threads?
        final AtomicReference<Throwable> ex = new AtomicReference<>();
        final ThreadGroup group = new ThreadGroup("read-write") {
            public void uncaughtException(Thread t, Throwable e) {ex.set(e);}
        };
        // reader from 'src'
        Thread reader = new Thread(group, () -> {
            try (InputStream src0 = src) {              // 'src0' for auto-closing
                while (true) {
                    byte[] data = new byte[128];        // new data buffer
                    int count = src.read(data, 1, 127); // read up to 127 bytes
                    data[0] = (byte) count;             // 0-byte is length-field
                    buffer.put(data);                   // send to writer
                    if (count == -1) {break;}           // src empty
                }
            } catch (Exception e) {group.interrupt();}  // interrupt writer
        });
        reader.start();
        // writer to 'dst'
        Thread writer = new Thread(group, () -> {
            try (OutputStream dst0 = dst) {      // 'dst0' for auto-closing
                while (true) {
                    byte[] data = buffer.take(); // get new data from reader
                    if (data[0] == -1) {break;}  // its last data
                    dst.write(data, 1, data[0]); // 
                }
            } catch (Exception e) {group.interrupt();}  // interrupt writer
        });
        writer.start();
        // wait to complete read/write operations
        try {
            reader.join(); // wait for reader
            writer.join(); // wait for writer
        } catch (InterruptedException e) {throw new IOException(e);}
        if (ex.get() != null) {throw new IOException(ex.get());}
    }
}


Для проверки корректности копирования можно использовать следующий тест
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;

public class Test {
    public static void main(String[] args) throws IOException {
        Random rnd = new Random(0);
        byte[] testData = new byte[64 * 1024];
        rnd.nextBytes(testData);
        ByteArrayOutputStream dst = new ByteArrayOutputStream();
        CopyUtil.copy(new ByteArrayInputStream(testData), dst);
        if (!Arrays.equals(testData, dst.toByteArray())) {
            throw new AssertionError("Lab decision wrong!");
        } else {
            System.out.println("OK!");
        }
    }
}


Задание #1


В последнем двупоточном решении мы стартуем два потока — для чтения и для записи. Перепишите код, что бы чтение осуществлялось в новом потоке, а запись производилась потоком, вызвавшим copy(...). Кстати, тогда можно будет избавиться от пары join-ов, так как поток на принимающем конце буфера знает, когда закончились данные.

Задание #2


В последнем двупоточном решении читатель постоянно создает новые byte[]-буфера, передает их писателю, а тот отправляет на съедение GC. Создайте отдельную обратную очередь пустых буферов от писателя к читателю.

Задание #3


Во всех трех примерах кода мы реализовывали передачу данных от одного читателя — одному писателю. Реализуйте многопоточное решение передачи данных от одного читателя — многим писателям. Все писатели получают идентичные данные. Читатель и писатели работают каждый в своем отдельном потоке. Не создавайте отдельные копии данных для каждого писателя — пусть писатели читают из одних на всех буферов, но храните эти буфера одновременно в разных очередях (от читателя к каждому писателю тянется отдельная очередь).
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class CopyUtil {
    public static void copy(InputStream src, OutputStream ... dst) throws IOException {
        // some code
    }
}


Задание #4


Сделайте предыдущее задание #3 но образуйте не топологию 'звезда', где в центре читатель и от него исходят лучи к писателям, а топологию 'кольцо'. В которой читатель и писатели выстраиваются в круг и передают буфер по кругу. Читатель — первому писателю, первый писатель — второму,… последний писатель — читателю. И после чего читатель может использовать буфер повторно.

Контакты


Я занимаюсь разработкой курса программирования по Java Core (online-курс).
email: GolovachCourses@gmail.com
skype: GolovachCourses
Tags:
Hubs:
+4
Comments 8
Comments Comments 8

Articles

Information

Website
www.golovachcourses.com
Registered
Founded
Employees
2–10 employees
Location
Украина