Как называется отправка сообщений в программировании
Перейти к содержимому

Как называется отправка сообщений в программировании

  • автор:

Концепция пересылки сообщения

Отдельного пояснения требует понятие обмена сообщениями. Первоначально (например, в том же Smalltalk) взаимодействие объектов представлялось как «настоящий» обмен сообщениями, то есть пересылка от одного объекта другому специального объекта-сообщения. Такая модель является чрезвычайно общей. Она прекрасно подходит, например, для описания параллельных вычислений с помощью активных объектов, каждый из которых имеет собственный поток исполнения и работает одновременно с прочими. Такие объекты могут вести себя как отдельные, абсолютно автономные вычислительные единицы. Посылка сообщений естественным образом решает вопрос обработки сообщений объектами, присвоенными полиморфным переменным — независимо от того, как объявляется переменная, сообщение обрабатывает код класса, к которому относится присвоенный переменной объект. Данный подход реализован в языках программирования Smalltalk, Ruby, Objective-C, Python.

Однако общность механизма обмена сообщениями имеет и другую сторону — «полноценная» передача сообщений требует дополнительных накладных расходов, что не всегда приемлемо. Поэтому во многих современных объектно-ориентированных языках программирования используется концепция «отправка сообщения как вызов метода» — объекты имеют доступные извне методы, вызовами которых и обеспечивается взаимодействие объектов. Данный подход реализован в огромном количестве языков программирования, в том числе C++, Object Pascal, Java, Oberon-2. Однако, это приводит к тому, что сообщения уже не являются самостоятельными объектами, и, как следствие, не имеют атрибутов, что сужает возможности программирования.

Перегрузка операторов

На примере класса комплексных чисел.

friend std::ostream &operator <<(std::ostream &out, Complex A);

Complex operator +(const Complex &c);

Complex operator *(const Complex &c);

Complex operator -(const Complex &c);

Complex operator /(const Complex &c);

Complex operator =(const Complex &c);

Complex operator ++(int x);

Complex operator +=(const Complex &c);

//Сравнение — основной оператор

bool operator ==( Complex const & right)

bool operator !=( Complex const & right)

// Преобразование комплексного числа в целое (not really)

ostream &operator <<(ostream &out, Complex A)

if (A.comp.a == 0 && A.comp.b == 0)

else if (abs(A.comp.a) >= A.delta && abs(A.comp.b) >= A.delta)

else if (abs(A.comp.a) < A.delta)

Complex Complex::operator +(const Complex &c)

slozh.comp.a = comp.a + c.comp.a;

slozh.comp.b = comp.b + c.comp.b;

Complex Complex::operator -(const Complex &c)

vichet.comp.a = comp.a — c.comp.a;

vichet.comp.b = comp.b — c.comp.b;

Complex Complex::operator *(const Complex &c)

umnoj.comp.a = comp.a * c.comp.a — comp.b * c.comp.b;

umnoj.comp.b = comp.a * c.comp.b + comp.b * c.comp.a;

Complex Complex::operator /(const Complex &c)

del.comp.a = (comp.a * c.comp.a + comp.b * c.comp.b) / (comp.a * comp.a + comp.b * comp.b);

del.comp.b = (comp.b * c.comp.a — comp.a * c.comp.b) / (comp.a * comp.a + comp.b * comp.b);

Complex Complex::operator =(const Complex &c)

// Проверка на самоприсваивание

//Постфиксный инкремент,формальный параметр может быть только типа int

Complex Complex::operator ++(int x)

Complex temp = *this;

++*this;// Вызов перегруженного оператора префиксного инкремента

Complex Complex::operator +=(const Complex &c)

cout << «Демонстрация метода — сложение\n»;

cout<<«Комлексные числа A и B равны»<<endl;

cout<<«Комлексные числа A и B не равны»<<endl;

curr_ptr = cmplx_arr[0];// работает оператор индексации

curr_ptr = cmplx_arr[1];// работает оператор индексации

// C=A;// работает оператор присваивания

curr_ptr = cmplx_arr[2];// работает оператор индексации

//Неявное преобразование типа с помощью перегрузки оператора преобразования типа

Как называется отправка сообщений в программировании

Рассмотрим однонаправленную связь между клиентом TcpClient и сервером TcpListener, когда либо сервер посылает данные, а клиент получает, либо, наоборот, клиент отправляет данные, а сервер получает. По сути здесь рассмотриваются те же примеры, что и в случае с сокетами в прошлой статье, только с использованием TcpListener и TcpClient.

Отправка данных клиенту

Сначала рассмотрим ситуацию, когда сервер отправляет данные, а клиент только их получает. Так, определим для сервера следующий код:

В качестве примера просто отправляем клиенту текущее время в формате hh:mm:ss. Для этого конвертируем строку в массив байтов и отправляем их с помощью метода stream.WriteAsync(). А на консоль сервера выводим диагностическое сообщение.

На стороне клиента определим следующий код:

На стороне клиента мы знаем, что сервер отправляет дату в виде строки, и для ее считывания определяем буфер — массив из 512 байтов. С помощью метода stream.ReadAsync() считываем данные из потока, конвертируем байты в строку и выводим ее на консоль.

Запустим сервер и клиент. При обращении к серверу клиент получит текущее время:

А консоль сервера отобразит ip-адрес клиента:

Получение данных от клиента

Теперь рассмотрим другой вид однонаправленной связи между клиентом и серверов, когда, наоборот, клиент отправляет данные, а сервер просто получает. Так, определим для сервера следующий код:

Здесь получаем у tcpClient поток NetworkStream и используем его для считывания данных от клиента. В частности, с помощью метода stream.ReadAsync() считываем данные из потока в массив байтов. В данном случае предполагаем, что данные будут представлять строку. И после получения из байтов строки выводим ее на консоль.

На клиенте определим простейший код для отправки некоторой строки:

В данном случае с помощью метода stream.WriteAsync() отправляем данные серверу. В качестве данных выступает простая строка.

Запустим сервер и клиент. После успешного подключения и отправки данных на консоли клиента мы увидим соответствующее сообщение:

А сервер получит от клиента сообщение и отобразит его на консоли:

Стратегии получения данных

Одна из наиболее сложных частей в клиент-серверном взаимодействии в tcp — это получение данных. И здесь нет однозначного решения, но есть ряд стратегий:

Использование буфера фиксированной длины, когда мы точно знаем, какой именно объем данных будет послан

Отправка в ответе информации о размере ответа, получив которую, нам будет проще считать нужное количество байтов

Использование маркера окончания ответа, получив который, мы завершим считывание данных

Использование любой из этих стратегий подразумевает определение пусть и примитивного но протокола взаимодействия между клиентом и сервером, когда клиент и сервер в соответствии с едиными правилами формируют, отправляют и извлекают данные из потока.

Версия с фиксированным буфером довольно очевидная, поэтому рассмотрим две остальных стратегии.

Использование маркера окончания ответа

Определим следующий код сервера:

Здесь с помощью метода stream.ReadByte() считываем каждый байт из потока. Результатом метода является представление байта в виде int. Допустим, здесь в качестве маркера окончания сообщения будет выступать символ \n или перевод строки, который представляет значение 10. И если встретился перевод строки, то заканчиваем чтение данных и конвертируем полученные данные в строку.

В этом случае клиент должен будет добавлять в конец сообщения символ \n:

Хотя это в какой-то степени несколько примитивное упрощение, поскольку в данном случае мы ограничиваемся отправкой однострочного текста. Но в реальности логика определения конечного маркера может быть более сложной, особенно когда маркер представляет не одним байт/символ, а несколько, но общий принип будет тем же.

Установка размера сообщения

Определим следующий сервер:

В данном случае мы предполагаем, что размер будет представлять значение типа int — то есть значение в 4 байта. Соответственно для считывания размера создаем буфер из 4 байт и считываем его с помощью метода stream.ReadExactlyAsync , который считывает определенное количество байт из потока (в данном случае 4 байта):

Считав размер, мы можем конвертировать его в число int с помощью статического метода BitConverter.ToInt32() , определить буфер соответствующей длины и считать в него данные.

На стороне клиента определим следующий код:

Здесь происходит обратный процесс. Сначала получаем размер сообщения в массив байтов методом BitConverter.GetBytes() :

Отправляем размер в виде четырех байтов и затем отправляем сами данные:

В итоге после отправки клиентом данных консоль сервера отобразит размер данных и сами данные:

Множественная отправка и получение

Выше клиент отправлял, а сервер получал только одно сообщение. Но что, если отправляется множество сообщений, которые сервер должен получать по отдельности. В этом случае нам опять нужно определить какой-нибудь протокол, в соответствии с которым сервер может определить, когда завершается одной сообщение, а когда клиент вообще завершает взавимодействие. Рассмотрим небольшой пример. Пусть сервер будет иметь следующий код:

Итак, здесь наш протокол клиент-серверного взаимодействия состоит из двух правил. Во-первых, каждое отдельное сообщение должно заканчиваться переводом строки \n. Во-вторых, когда клиент хочет завершить взаимодействие и отключиться от сервера, он посылает команду «END». Поэтому в бесконечном цикле обрабатываем все сообщения от клиента, а во вложенном цикле считываем по байту:

Если пришла команда «END», выходит из бесконечного цикла:

Для работы с этим сервером определим следующий клиент:

Здесь каждое отправляемое сообщение оканчивается терминальным символом \n, кроме того, последнее сообщение представляет команду окончания взаимодействия «END». В итоге при подключении этого клиента сервер оторазит на консоли все присланные сообщения, кроме команды «END», которая завершит взаимодействие клиента и сервера:

Name already in use

book / rustbook-ru / src / ch16-02-message-passing.md

  • Go to file T
  • Go to line L
  • Copy path
  • Copy permalink
  • Open with Desktop
  • View raw
  • Copy raw contents Copy raw contents

Copy raw contents

Copy raw contents

Передача данных с помощью сообщений между потоками

Всё большую популярность для обеспечения безопасной многопоточности набирает способ, называемый передача сообщений. В этом случае потоки или акторы взаимодействуют друг с другом путём отправки сообщений с данными. Идея этого подхода выражена в слогане из документации языка Go таким образом: «Не стоит передавать информацию с помощью разделяемой памяти; лучше делитесь памятью, передавая информацию».

Для обеспечения отправки многопоточных сообщений в стандартной библиотеке языка Rust реализованы каналы. Канал в программировании — это общепринятый механизм, с помощью которого данные из одного потока отправляются другому потоку.

https://amdy.su/wp-admin/options-general.php?page=ad-inserter.php#tab-8

Вы можете представить канал в программировании как направленное движение воды, например как ручей или реку. Если вы поместите какую-нибудь вещь на воду, например резиновую уточку, она будет плыть вниз по течению до тех пор, пока это течение не кончится.

Канал состоит из двух половин: передатчика и приёмника. Передатчик — это место вверх по течению, где вы опускаете резиновых уточек в реку, а приёмник — это место, где резиновые уточки оказываются в конце пути. Одна часть вашего кода вызывает методы передатчика с данными, которые вы хотите отправить, а другая часть проверяет принимающую сторону на наличие поступающих сообщений. Канал считается закрытым , если либо передающая, либо принимающая его половина уничтожена.

Давайте создадим программу, в которой один поток будет генерировать значения и отправлять их в канал, а другой поток будет получать значения и распечатывать их. Мы будем отправлять между потоками простые значения, используя канал, чтобы проиллюстрировать эту функцию. После того, как вы ознакомитесь с этим методом, вы сможете использовать каналы с любыми потоками, которым необходимо взаимодействовать друг с другом. Это может быть например система чата или система, в которой несколько вычислительных потоков выполняют свою часть расчёта, а затем отправляют эту часть в отдельный поток, который уже агрегирует полученные результаты.

Сначала в листинге 16-6 мы создадим канал, но не будем ничего с ним делать. Обратите внимание, что этот код ещё не компилируется, потому что Rust не может сказать, какой тип значений мы хотим отправить через канал.

Листинг 16-6: Создание канала и присваивание двух значений переменным tx и rx

Мы создаём новый канал, используя функцию mpsc::channel ; mpsc означает несколько производителей, один потребитель (multiple producer, single consumer). Коротко, способ которым стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих источников генерирующих значения, но только одну принимающую сторону, которая потребляет эти значения. Представьте, что несколько ручьёв втекают в одну большую реку: всё, что плывёт вниз по любому из ручьёв, в конце концов окажется в одной реке. Сейчас мы пока начнём с одного производителя, а когда пример заработает, добавим ещё несколько.

Функция mpsc::channel возвращает кортеж, первый элемент которого является отправляющей стороной (передатчиком), а вторым элементом является принимающая сторона (получатель). Аббревиатуры tx и rx традиционно используются во многих полях для передатчика и приёмника соответственно, поэтому мы называем соответствующие переменные именно так. Мы используем оператор let с шаблоном, который деструктурирует кортежи; мы обсудим использование шаблонов в операторах let и деструктуризацию в главе 18. А пока знайте, что описанное использование оператора let является удобным способом извлечения частей кортежа, возвращаемых mpsc::channel .

Давайте переместим передающую часть в порождённый поток так, чтобы он отправлял одну строку и чтобы таким образом, порождённый поток связывался с основным потоком, как показано в листинге 16-7. Это похоже на то, как если бы вы поместили резиновую утку в реку вверх по течению или отправили сообщение чата из одного потока в другой.

Листинг 16-7: Перемещение tx в созданный поток и отправка сообщения «привет»

Опять же, мы используем thread::spawn для создания нового потока, а затем используем move для перемещения tx в замыкание, чтобы порождённый поток владел tx . Порождённый поток должен владеть передатчиком, чтобы иметь возможность отправлять сообщения через канал. Передатчик имеет метод send , который принимает значение, которое мы хотим отправить. Метод send возвращает тип Result<T, E> , поэтому, если получатель уже удалён и отправить значение некуда, операция отправки вернёт ошибку. В этом примере мы вызываем unwrap для паники в случае ошибки. В реальном приложении мы обработали бы эту ситуацию более корректно: вернитесь к главе 9, если хотите ещё раз разобрать стратегии правильной обработки ошибок.

В листинге 16-8 мы получим значение от приёмника в основном потоке. Это похоже на извлечение резиновой уточки из воды в конце реки или получение сообщения в чате.

Листинг 16-8: В основном потоке получаем сообщение «hi» и печатаем его

Получатель имеет два важных метода: recv и try_recv . Мы используем recv , что является сокращением от receive, который блокирует выполнение основного потока и ждёт, пока данные не будут переданы по каналу. Как только значение будет получено, recv вернёт его в виде Result<T, E> . Когда канал закроется, recv вернёт ошибку, чтобы дать понять, что больше никаких сообщений не поступит.

В свою очередь, метод try_recv не блокирует, а сразу возвращает результат Result<T, E> : значение Ok, содержащее сообщение, если оно доступно или значение Err, если никаких сообщений не поступило. Использование try_recv полезно, если у этого потока есть и другая работа в то время, пока происходит ожидание сообщений: так, мы можем написать цикл, который вызывает try_recv время от времени, обрабатывает сообщение, если оно доступно, а в промежутке выполняет другую работу до того момента, как вновь будет произведена проверка.

Мы использовали recv в этом примере для простоты; у нас нет никакой другой работы для основного потока, кроме как ждать сообщений, поэтому блокировка основного потока уместна.

При запуске кода листинга 16-8, мы увидим значение, напечатанное из основного потока:

Каналы и передача владения

Правила владения играют жизненно важную роль в отправке сообщений, потому что они помогают писать безопасный многопоточный код. Предотвращение ошибок в многопоточном программировании является преимуществом для размышлений о владении во всех ваших Rust программах. Давайте проведём эксперимент, чтобы показать как каналы и владение действуют совместно для предотвращения проблем: мы попытаемся использовать значение val в порождённом потоке после того как отправим его в канал. Попробуйте скомпилировать код в листинге 16-9, чтобы понять, почему этот код не разрешён:

Листинг 16-9: Попытка использовать val после того, как мы отправили его по каналу

Здесь мы пытаемся напечатать значение val после того, как отправили его в канал вызвав tx.send . Разрешить это было бы плохой идеей: после того, как значение было отправлено в другой поток, текущий поток мог бы изменить или удалить значение, прежде чем мы попытались бы использовать значение снова. Потенциально изменения в другом потоке могут привести к ошибкам или не ожидаемым результатам из-за противоречивых или несуществующих данных. Однако Rust выдаёт нам ошибку, если мы пытаемся скомпилировать код в листинге 16-9:

Наша ошибка для многопоточности привела к ошибке компиляции. Функция send вступает во владение своим параметром и когда значение перемещается, получатель становится владельцем этого параметра. Это останавливает нас от случайного использования значения снова после его отправки; анализатор заимствования проверяет, что все в порядке.

Отправка нескольких значений и ожидание получателем

Код в листинге 16-8 компилируется и выполняется, но в нем неясно показано то, что два отдельных потока общаются друг с другом через канал. В листинге 16-10 мы внесли некоторые изменения, которые докажут, что код в листинге 16-8 работает одновременно: порождённый поток теперь будет отправлять несколько сообщений и делать паузу на секунду между каждым сообщением.

Листинг 16-10: Отправка нескольких сообщений и пауза между ними

На этот раз порождённый поток имеет вектор строк, которые мы хотим отправить основному потоку. Мы перебираем их, отправляя каждую строку по отдельности и делаем паузу между ними, вызывая функцию thread::sleep со значением Duration равным 1 секунде.

В основном потоке мы больше не вызываем функцию recv явно: вместо этого мы используем rx как итератор. Для каждого полученного значения мы печатаем его. Когда канал будет закрыт, итерация закончится.

При выполнении кода в листинге 16-10 вы должны увидеть следующий вывод с паузой в 1 секунду между каждой строкой:

Поскольку у нас нет кода, который приостанавливает или задерживает цикл for в основном потоке, мы можем сказать, что основной поток ожидает получения значений из порождённого потока.

Создание нескольких отправителей путём клонирования передатчика

Ранее мы упоминали, что mpsc — это аббревиатура от множество поставщиков, один потребитель . Давайте используем mpsc в полной мере и расширим код в листинге 16.10, создав несколько потоков, которые отправляют значения одному и тому же получателю. Мы можем сделать это, клонировав передатчик, как показано в листинге 16.11:

Листинг 16-11: Отправка нескольких сообщений от нескольких производителей

На этот раз, прежде чем мы создадим первый порождённый поток, мы вызовем функцию clone на передатчике. В результате мы получим новый передатчик, который мы сможем передать первому порождённому потоку. Исходный передатчик мы передадим второму порождённому потоку. Это даст нам два потока, каждый из которых отправляет разные сообщения одному получателю.

Когда вы запустите код, вывод должен выглядеть примерно так:

Вы можете увидеть значения в другом порядке, в зависимости от вашей системы. Именно такое поведение делает параллелизм как интересным, так и сложным. Если вы поэкспериментируете с thread::sleep , задавая различные значения аргумента в разных потоках, каждый запуск будет более недетерминированным и каждый раз будут выводиться разные данные.

Теперь, когда мы посмотрели, как работают каналы, давайте рассмотрим другой метод многопоточности.

Руководство по программированию сокетов на Python. Протокол и сообщения

К концу руководства вы освоите основные функции и методы модуля Python socket, научитесь применять пользовательский класс для отправки сообщений и данных между конечными точками и работать со всем этим в собственных клиент-серверных приложениях. Материалом делимся к старту курса по Fullstack-разработке на Python.

Вам нужны клиент и сервер, в которых ошибки обрабатываются без влияния на другие подключения. Очевидно, клиент или сервер не должны стремительно рухнуть, когда исключение не перехвачено. До сих пор вам не приходилось об этом беспокоиться: ради краткости и ясности обработка ошибок из примеров намеренно исключена.

Ознакомившись с базовым API, неблокируемыми сокетами и .select() , вы можете добавить обработку ошибок и заняться «слоном в комнате», который в примерах скрывался от вас за большим занавесом. Помните тот пользовательский класс, о котором говорилось в самом начале? Он изучается далее.

Сначала устраните ошибки:

Поэтому без перехвата OSError не обойтись. В отношении ошибок важно учитывать ещё и тайм-ауты. О них часто упоминается в документации. Тайм-ауты происходят, это так называемая обычная ошибка: хосты и роутеры перезагружаются, порты коммутаторов выходят из строя, кабели отсоединяются… всего не перечесть. Нужно быть готовым обрабатывать в коде эти и другие ошибки.

А как быть со «слоном в комнате»? Из типа сокета socket.SOCK_STREAM следует, что при использовании TCP данные считываются непрерывным потоком байтов. Это как чтение данных файла на диске, но байты считываются из сети. И, в отличие от чтения файла, здесь нет f.seek() .

То есть нельзя изменить положение указателя сокета, если таковой был, и перемещать данные.

Когда байты поступают в сокет, используются сетевые буферы. После считывания байты нужно где-то сохранить, иначе они будут отброшены. Когда вызывается .recv() , из сокета снова считывается следующий поток доступных байтов.

Данные из сокета будут считываться большими кусками. Таким образом, нужно вызывать .recv() и сохранять данные в буфере, пока не считается объём байтов, достаточный для полного сообщения, которое соответствует приложению.

Определение и отслеживание границ сообщения зависит от вас. В TCP-сокете лишь отправка в сеть и получение из сети необработанных байтов, о значении которых ничего не известно.

Вот почему нужно определить протокол прикладного уровня. Что такое «протокол прикладного уровня»? Если коротко, в приложении отправляются и получаются сообщения, и формат сообщений — это протокол приложения.

То есть длиной и форматом, которые выбираются для этих сообщений, определяются семантика и поведение приложения. Это напрямую связано с тем, чтó вы узнали выше о считывании байтов из сокета. Когда байты считываются с помощью .recv() , нужно следить за тем, сколько их считано, и определять границы сообщения. Один из способов это делать — всегда отправлять сообщения фиксированной длины. Если они всегда одного размера, это несложно. Когда это число байтов считано в буфер, у вас будет одно полное сообщение.

Но сообщения фиксированной длины неэффективны в случае с небольшими сообщениями, где для их заполнения понадобятся отступы. Кроме того, остаётся проблема с данными, которые в сообщении не умещаются.

В этом руководстве вы узнаете об универсальном подходе, применяемом во многих протоколах, включая HTTP. Научитесь предварять сообщения заголовком, в котором указывается длина содержимого, а также любые другие нужные поля. Следить нужно только за заголовком. После считывания заголовок можно обработать, чтобы определить длину содержимого сообщения. С длиной содержимого можно рассчитать число байтов и использовать его.

Для этого напишем пользоваельский класс, где можно отправлять и получать сообщения с текстовыми или двоичными данными. Для целей конкретных приложений этот класс можно расширить и сделать лучше. Самое главное — вы увидите пример, как это делается. Но сначала нужно кое-что узнать о сокетах и байтах. Как вы видели ранее, данные отправляются и получаются через сокеты в виде необработанных байтов.

Если вы получаете данные и хотите использовать их в контексте, где они интерпретируются как несколько байтов, например 4-байтовое целое число, нужно учесть, что оно может быть в формате, ненативном для ЦП вашего компьютера. Клиент или сервер на другом конце может иметь ЦП с порядком байтов, отличным от вашего. Тогда, прежде чем использовать, нужно преобразовать его в порядок байтов вашего хоста.

Это называется «порядком следования байтов». Подробнее о нём см. в справочном разделе. Во избежание этой проблемы применяйте кодировку UTF-8, а в заголовке сообщения — Unicode. В UTF-8 используется 8-битный формат кодирования, поэтому проблем с порядком байтов не возникает.

Объяснение содержится в документации Python «Кодировки и Unicode. Внимание: это относится только к текстовому заголовку. Вы будете использовать явно задаваемый тип и кодировку, определённые в заголовке для отправляемого содержимого — полезной нагрузки сообщения. Так будет возможна передача любых данных (текстовых или двоичных) в любом формате.

Порядок байтов на компьютере легко определить с помощью sys.byteorder . Например, так:

Если запустить это на виртуальной машине с эмуляцией ЦП с обратным порядком байтов (PowerPC), можно увидеть примерно следующее:

В этом примере приложения в протоколе прикладного уровня заголовок определяется в виде текста в «Юникоде» с кодировкой UTF-8. Порядок байтов фактического содержимого сообщения, то есть его полезной нагрузки, в случае необходимости по-прежнему придётся менять вручную.

Это будет зависеть от приложения и от того, нужно ли в нём обрабатывать многобайтовые двоичные данные с компьютера, на котором другой порядок следования байтов. Можно реализовать в клиенте или на сервере поддержку двоичных данных: добавить заголовки для такой же передачи параметров, как по HTTP.

Не переживайте, если это пока непонятно. В следующем разделе вы увидите, как всё это сочетается.

Заголовок протокола приложения

Теперь вы полностью определите заголовок протокола. Заголовок протокола — это:

  • текст нефиксированной длины;
  • Unicode с кодировкой UTF-8;
  • словарь Python, сериализованный с применением JSON.

Вот необходимые заголовки или подзаголовки в словаре заголовка протокола:

Название Описание
byteorder Порядок байтов компьютера (используется sys.byteorder ). Может не требоваться приложению.
content-length Длина содержимого в байтах.
content-type Тип содержимого полезной нагрузки, например text/json или binary/my-binary-type .
content-encoding Применяемая в содержимом кодировка, например utf-8 для текста в «Юникоде» или binary для двоичных данных.

Этими заголовками получатель информируется о содержимом полезной нагрузки сообщения. Благодаря этому можно отправлять произвольные данные, указывая информацию, достаточную для корректного декодирования и интерпретации содержимого получателем. Заголовки находятся в словаре, поэтому при необходимости легко добавляются вставкой пар «ключ — значение».

Отправка сообщения приложения

Есть ещё одна небольшая проблема. Заголовок нефиксированной длины — это сочетание удобства и гибкости, но как узнать его длину при считывании с помощью .recv() ?

Выше уже говорилось о применении .recv() и границах сообщения, и попутно вы узнали, что заголовки фиксированной длины могут быть неэффективными. Это правда, но у вас будет небольшой 2-байтовый заголовок фиксированной длины с указанием длины следующего за ним заголовка в формате JSON.

Это гибридный подход к отправке сообщений: по сути, вы запускаете процесс получения сообщения, предварительно отправляя длину заголовка. Так упрощается восстановление сообщения получателем. Рассмотрим сообщение полностью:

Сообщение приложения с сокетами

Оно начинается с заголовка фиксированной длины в два байта — целого числа в сетевом порядке байтов. Это длина следующего JSON-заголовка нефиксированной длины. Считав два байта с помощью .recv() , вы теперь знаете, что можете обработать их как целое число, а затем считать это количество байтов перед декодированием JSON-заголовка с кодировкой UTF-8.

В JSON-заголовке содержится словарь дополнительных заголовков. Один из них — content-length . Это число байтов содержимого сообщения (без JSON-заголовка). Вызвав .recv() и считав байты content-length , вы достигнете границы сообщения, а значит, оно будет полностью считано.

Класс Message приложения

Ну наконец-то добрались! В этом разделе вы изучите класс Message и его применение с .select() , когда в сокете происходят события чтения и записи.

В примере приложения покажем, какие типы сообщений целесообразно использовать в клиенте и на сервере. Имитированные эхо-клиенты и эхо-серверы остались далеко позади, продемонстрируем теперь работу реального приложения.

Ради простоты в этом примере используется протокол приложения с реализацией базовой функции поиска. От клиента отправляется поисковый запрос, а на сервере выполняется поиск соответствия. Если клиентский запрос не признаётся поисковым, на сервере он принимается за двоичный, и оттуда возвращается двоичный ответ.

Прочитайте следующие разделы и запустите примеры, экспериментируйте с кодом — и вы увидите принципы работы. А затем для начала используете класс Message , подогнав его под свои задачи.

Приложение не сильно отличается от примера клиента и сервера multiconn . В app-client.py и app-server.py остаётся тот же код цикла событий. Вы переместите код сообщения в класс Message и добавите методы для поддержки чтения, записи, а также обработки заголовков и содержимого. Это отличный пример использования класса.

Как вы уже узнали и ещё увидите, работа с сокетами связана с сохранением состояния. Используя класс, вы сохраняете всё состояние, данные и код вместе. Когда подключение инициируется или принимается, для каждого сокета в клиенте и на сервере создаётся экземпляр класса.

Для обёрточных и вспомогательных методов и в клиенте, и на сервере класс по большей части один и тот же. Эти методы начинаются с подчеркивания, например Message._json_encode() . Благодаря им работа с классом упрощается: в других методах поддерживается принцип DRY, они укорачиваются.

Серверный и клиентский классы Message , по сути, идентичны. Различаются они тем, что в клиенте инициируется подключение и отправляется сообщение-запрос, а затем обрабатывается сообщение-ответ с сервера. На сервере наоборот: подключение ожидается, сообщение-запрос клиента обрабатывается, затем отправляется сообщение-ответ.

Выглядит это так:

Этап Конечная точка Действие / содержимое сообщения
1 Клиент Отправляется Message (сообщение) с содержимым запроса
2 Сервер Получается и обрабатывается клиентский запрос Message
3 Сервер Отправляется Message (сообщение) с содержимым ответа
4 Клиент Получается и обрабатывается сообщение-ответ Message с сервера

Вот размещение файлов и кода:

Приложение Файл Код
Сервер app-server.py Основной скрипт сервера
Сервер libserver.py Серверный класс Message
Клиент app-client.py Основной скрипт клиента
Клиент libclient.py Клиентский класс Message
Точка входа в сообщении

Понять принцип работы класса Message может быть непросто из-за, возможно, неочевидного аспекта. Какого? Управления состоянием.

После создания объект Message привязывается к сокету, где с помощью selector.register() отслеживаются события:

Готовые события возвращаются из сокета при помощи selector.select() . Затем можно снова получить ссылку на объект message, используя атрибут data в объекте key , и вызвать метод в Message :

Судя по этому циклу событий, с участием sel.select() происходит многое: блокировка; ожидание событий в верхней части цикла; возобновление цикла, когда события чтения и записи готовы к обработке в сокете. Этим косвенно указывается и на причастность sel.select() к вызову метода .process_events() , который поэтому является точкой входа.

Вот что делается в этом методе:

.process_events() — простой метод, и это хорошо. В нём вызываются только .read() и .write() .

Здесь и появляется управление состоянием. Если бы другой метод зависел от переменных состояния, в которых содержится определённое значение, они бы вызывались только из .read() и .write() . От этого логика предельно упрощается, ведь события поступают в сокет для обработки.

Может возникнуть соблазн применить сочетание методов, которыми проверяются переменные текущего состояния, и в зависимости от их значения вызвать другие методы для обработки данных вне .read() или .write() . В итоге контролировать и отслеживать это, наверное, оказалось бы слишком сложно.

Конечно, класс стóит подогнать под свои нужды, но лучшие результаты вы, наверное, получите при сохранении в методах .read() и .write() проверок состояния и вызовов методов, которые от этого состояния зависят (если это сохранение возможно).

Посмотрите теперь на .read() . Это серверная версия. Впрочем, клиентская такая же, отличается лишь названием метода: .process_response() вместо .process_request() :

Сначала вызывается метод ._read() . И в нём, чтобы считать данные из сокета и сохранить их в буфере приёма, вызывается socket.recv() .

При этом может оказаться, что поступили ещё не все данные, из которых состоит полное сообщение. И socket.recv() потребуется вызвать снова. Вот почему, прежде чем для каждой части сообщения вызывать соответствующий метод её обработки, выполняются проверки состояния.

До обработки своей части сообщения методом проверяется, сколько байтов считано в буфер приёма. Если достаточно, соответствующие байты им обрабатываются, удаляются из буфера, а их вывод записывается в переменную, используемую на следующем этапе обработки. Поскольку сообщение состоит из трёх компонентов, то и проверок состояния, и вызовов метода process тоже три:

Компонент сообщения Метод Вывод
Заголовок фиксированной длины process_protoheader() self._jsonheader_len
JSON-заголовок process_jsonheader() self.jsonheader
Содержимое process_request() self.request

Дальше посмотрите на .write() . Это серверная версия:

Сначала в методе .write() проверяется наличие request (запроса). Если запрос существует, а ответ не создан, вызывается метод .create_response() , в котором задаётся переменная состояния response_created , и в пересылочный буфер записывается ответ. Если в этом буфере имеются данные, в методе ._write() вызывается socket.send() .

При этом может оказаться, что в пересылочном буфере не все данные поставлены в очередь на передачу. Сетевые буферы для сокета могут быть заполнены, и socket.send() потребуется вызвать снова. Для этого и существуют проверки состояния. Метод .create_response() должен вызываться лишь однажды. А вот вызовов ._write() ожидается несколько.

Клиентская версия .write() похожа:

Сначала в клиенте инициируется подключение к серверу и отправляется запрос, поэтому проверяется переменная состояния _request_queued . Если запрос не поставлен в очередь, на нём вызывается метод .queue_request() , в котором этот запрос создаётся и записывается в пересылочный буфер. Кроме того, чтобы метод вызывался только один раз, в нём задаётся переменная состояния _request_queued .

Как и на сервере, если в пересылочном буфере имеются данные, в ._write() вызывается socket.send() .

Заметное отличие клиентской версии .write() — последняя проверка (того, поставлен ли запрос в очередь). «Основной скрипт клиента». Если коротко, ею прекращается отслеживание через selector.select() событий записи в сокете. Если запрос в очереди, а пересылочный буфер пуст, значит, записи закончились — остаются только события чтения. И получать уведомления о том, что сокет доступен для записи, незачем.

В заключение подумайте о главных целях этого раздела: 1) в selector.select() через метод .process_events() вызывается класс Message , и 2) управление состоянием.

Это важно, ведь за время действия подключения .process_events() будет вызываться многократно. Поэтому убедитесь, что переменная состояния проверяется: 1) внутри любых методов, которые должны вызываться лишь однажды, или 2) в источнике вызова, если она задана в методе.

Основной скрипт сервера

В основном скрипте сервера app-server.py аргументы считываются из командной строки, где указываются прослушиваемые интерфейс и порт:

Например, чтобы прослушивать интерфейс «внутренней петли» порта 65432 , введите:

Чтобы прослушивать все интерфейсы, оставляйте в <host> пустую строку.

Создав сокет, выполняем вызов в socket.setsockopt() с параметром socket.SO_REUSEADDR :

Благодаря этому параметру сокета удаётся избежать ошибки Address already in use . Вы увидите это, когда запустите сервер в порте с подключениями в состоянии TIME_WAIT.

Например, если на сервере намеренно закрыть подключение, оно останется в состоянии TIME_WAIT не менее двух минут (продолжительность зависит от ОС). Если до истечения этого времени попробовать запустить сервер снова, из Address already in use получим исключение OSError . Это гарантия того, что любые задержанные пакеты доставляются в сети приложению, которому они предназначены.

В цикле событий отлавливаются любые ошибки, поэтому сервер не «падает» и продолжает запускаться:

Когда клиентское подключение принимается, создаётся объект Message :

Объект Message связан с сокетом в вызове sel.register() и изначально настроен для отслеживания только событий чтения. Считав запрос, вы настроите Message на прослушивание только событий записи.

Каково преимущество такого подхода на сервере? В большинстве случаев, когда сокет исправен и нет проблем с сетью, он всегда доступен для записи.

Дав указание отслеживать через sel.register() ещё и EVENT_WRITE , цикл событий сразу возобновится, и это указание будет в нём отражено. Но сейчас нет причин возобновлять его и вызывать в сокете .send() , ведь отправлять нечего: ответа нет, потому что запрос ещё не обработан. При отправке расходовались бы впустую ценные циклы ЦП.

Серверный класс Message

В разделе «Точка входа в сообщении» вы узнали, как через .process_events() вызывается объект Message , когда события в сокете готовы. А что происходит, когда данные в сокете считываются и компонент или фрагмент сообщения готов к обработке на сервере?

Серверный класс Message находится в libserver.py нашего исходного кода. Код также можно загрузить по ссылке ниже:

Методы в классе располагаются в порядке обработки сообщения.

Заголовок фиксированной длины может обрабатываться, когда на сервере считано минимум два байта:

Заголовок фиксированной длины — это 2-байтовое целое число в сетевом или обратном порядке байтов, включая длину JSON-заголовка. Чтобы считать, декодировать и сохранить значение в self._jsonheader_len , вы будете использовать struct.unpack(). После обработки части сообщения, соответствующей этому значению, последнее удаляется из буфера приёма с помощью .process_protoheader() .

Как и заголовок фиксированной длины, JSON-заголовок может обрабатываться, когда в буфере приёма для него достаточно данных:

Чтобы выполнить декодирование и десериализацию JSON-заголовка в словарь, вызывается метод self._json_decode() . Этот заголовок определён как Unicode с кодировкой UTF-8, поэтому в вызове жёстко задана utf-8 . Результат сохраняется в self.jsonheader . После обработки соответствующей части сообщения результат удаляется из буфера приёма методом process_jsonheader() .

Переходим к фактическому содержимому — полезной нагрузке сообщения. Оно описывается в self.jsonheader JSON-заголовка. Запрос может обрабатываться, когда в буфере приёма доступны байты content-length :

После сохранения сообщения в переменной data , оно удаляется из буфера приёма при помощи .process_request() . Затем в том же методе выполняются декодирование и десериализация. Это если тип содержимого — JSON. А если нет, оно принимается за двоичный запрос, и в этом примере тип содержимого приложения просто печатается.

Наконец, с помощью .process_request() селектор настраивается на отслеживание только событий записи. В основном скрипте сервера app-server.py сокет изначально настроен на отслеживание только событий чтения. Но они вас больше не интересуют, ведь запрос обработан полностью.

Теперь можно создать ответ и записать в сокет. Когда сокет доступен для записи, из .write() вызывается .create_response() :

Ответ создаётся путём вызова других методов, в зависимости от типа содержимого. В этом примере приложения выполняется простой поиск запросов JSON по словарю, когда action == ‘search’ . Для своих приложений вы можете определить другие вызываемые здесь методы.

После создания сообщения-ответа, чтобы .create_response() не вызывался из .write() повторно, задаётся переменная состояния self.response_created . Наконец, ответ добавляется в пересылочный буфер. Увидеть и отправить его можно через ._write() .

Остаётся разобраться, как после записи ответа закрыть подключение. Вызов .close() можно поместить в метод ._write() :

Хотя .close() чуть спрятан, с учётом того, что в классе Message обрабатывается лишь одно сообщение на подключение, это приемлемый компромисс. После записи ответа работа сервера завершена.

Основной скрипт клиента

В основном скрипте клиента app-client.py аргументы считываются из командной строки и используются для создания запросов и инициирования подключений к серверу:

Создав словарь, — запрос из аргументов командной строки, передаём его вместе с хостом и портом в .start_connection() :

Для подключения к серверу создаётся сокет, а также объект Message с использованием словаря request .

Как и на сервере, объект Message связан с сокетом в вызове sel.register() . Но в клиенте сокет изначально настроен для отслеживания событий чтения и записи. Записав запрос, вы настроите сокет на прослушивание только событий чтения.

У этого подхода то же преимущество, что и на сервере: не тратятся впустую циклы ЦП. После того как запрос отправлен, события записи вас не интересуют — возобновлять цикл событий и обрабатывать их больше незачем.

Клиентский класс Message

В разделе «Точка входа в сообщении» вы узнали, как через .process_events() вызывается объект message, когда события в сокете готовы. А что происходит после того, как данные в сокете считаны и записаны, а сообщение готово к обработке в клиенте?

Клиентский класс Message находится в libclient.py , части загруженного вами ранее исходного кода. Код также можно загрузить по ссылке ниже:

Методы в классе располагаются в порядке обработки сообщения.

Первая задача в клиенте — это постановка запроса в очередь:

Словари, используемые для создания запроса, в зависимости от того, чтó передано в командной строке, находятся в основном скрипте клиента, app-client.py . Когда создаётся объект Message , словарь-запрос передаётся в класс как аргумент.

Сообщение-запрос создаётся и добавляется в пересылочный буфер, который затем можно увидеть и отправить через ._write() . Чтобы .queue_request() не вызывался повторно, задаётся переменная состояния self._request_queued .

После отправки запроса клиент ожидает ответа от сервера.

Методы чтения и обработки сообщения в клиенте те же, что и на сервере. Когда с сокета считываются данные ответа, вызываются .process_protoheader() и .process_jsonheader() — методы заголовка process .

Отличаются они именованием окончательных методов process и тем, что ответ в них не создаётся, а обрабатывается: .process_response() , ._process_response_json_content() и ._process_response_binary_content() .

И последнее (но, конечно, не менее важное) — финальный вызов .process_response() :

Ещё кое-что важное о классе Message

Завершая знакомство с классом Message , стóит обратить внимание — при помощи вспомогательных методов — на ещё кое-что важное.

Любые вызванные в классе исключения отлавливаются в основном скрипте — в инструкции except внутри цикла событий:

Очень важна строка с message.close() : здесь сокет не только закрывается, но и прекращает отслеживаться через .select() . Код в классе сильно упрощается. Если есть исключение или вы сами явно его вызываете, .close() выполнит очистку.

В методах Message._read() и Message._write() тоже есть кое-что интересное:

Обратите внимание на строку except BlockingIOError: .

В методе ._write() она тоже имеется. Это важные строки: в них отлавливается временнáя ошибка и с помощью pass пропускается. Такая ошибка возникает, когда сокет блокируется, например при ожидании в сети или на другом конце подключения, то есть в его одноранговом узле.

После того как исключение отловлено и пропущено с помощью pass , в .select() в итоге запустится новый вызов, и вы получите ещё одну возможность считать или записать данные.

Запуск клиента и сервера приложения

Мы неплохо потрудились, теперь займёмся поиском!

В этих примерах вы запустите сервер и, передав пустую строку для аргумента host , прослушаете все интерфейсы. Так будет возможен запуск клиента и подключение с виртуальной машины другой сети — эмуляции машины PowerPC с обратным порядком байтов.

Сначала запустите сервер:

Теперь запустите клиент и начните поиск. Посмотрим, сможете ли вы его найти:

Вы могли заметить, что в терминале запускается оболочка, в которой применяется кодировка текста Unicode (UTF-8), поэтому показанный выше вывод хорошо получается с эмодзи.

Посмотрим теперь, сможете ли вы найти щенков:

Обратите внимание на строку с sending . Здесь по сети отправлена строка байтов для запроса. Её проще увидеть, если искать байты, выводимые в шестнадцатеричном формате, которым обозначаются эмодзи щенка: \xf0\x9f\x90\xb6 . Если в вашем терминале применяется Unicode с кодировкой UTF-8, вы сможете ввести эмодзи для поиска.

Этим показывается, что вы отправляете по сети необработанные байты, которые для корректной их интерпретации должны быть декодированы получателем. Потому вы и пошли на все эти хлопоты по созданию заголовка с указанием типа содержимого и кодировки.

Вот вывод сервера на обоих показанных выше клиентских подключениях:

В строке с sending обозначены байты, записанные в сокет клиента. Это сообщение-ответ с сервера.

Проверьте отправку на сервер двоичных запросов, когда аргумент action отличается от search :

Поскольку content-type запроса не text/json , на сервере он принимается за пользовательский двоичный тип, и декодирование JSON не выполняется. А просто выводится content-type , и клиенту возвращаются первые 10 байт:

Научим вас аккуратно работать с данными, чтобы вы прокачали карьеру и стали востребованным IT-специалистом. Новогодняя акция — скидки до 50% по промокоду HABR:

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *