Skip to content

mambaru/wfc_io

Repository files navigation

tag Build Status Build Status Build Status Build Status codecov

wfc_io: Пакет сетевых модулей

Включает модули:

  • server-tcp
  • server-udp
  • client-tcp
  • io-queue -
  • statistics - сбор статистики

server-tcp

Минимальная конфигурация:

{
  "server-tcp": [
    {
      "name": "server-tcp1",
      "port": "12345"
    }
  ]
}

В этой конфигурации сервер открывает порт на localhost, акцептит подключения, но сбрасывает их при попытке отправить ему данные. Для полноценной работы необходимо указать цель, которая будет обрабатывать входящий поток данных. Это, как правило, jsonrpc-сервис прикладного модуля или вспомогательные io- и jsonrpc-модули. В следующем примере входящие сообщения отправляются в сыром виде hash-объекту из демо примера:

{
  "server-tcp": [
    {
      "name": "server-tcp1",
      "target": "hash1",
      "addr": "0.0.0.0",
      "port": "12345"
    }
  ],
  "hash": [
    {
      "name": "hash1"
    }
  ]
}

server-tcp поддерживает динамическую реконфигурацию, но при реконфигурировании все текущие соединения сбрасываются. В suspend-режиме работает как эхо-сервис. Если включена динамическая реконфигурация и изменен только флаг suspend, то изменения вступают в силу только для новых подключений, а текущие продолжают работать.

По умолчанию архитектура однопоточная (thrеads=0, работает в основном потоке приложения), при увеличении числа потоков работает в режиме акцептор-на-поток, соединение работает в том же потоке, где и получило акцепт. При высоких нагрузках рекомендуется выделить хотя-бы один поток (thrеads=1) в этом режиме сервер так же однопоточный, но работает в отдельном потоке, и не захламляет заданиями основную очередь приложения, где крутятся таймеры и пр. Если используются очереди, то выделять большое количество потоков для сервера не имеет смысла, лучше увеличить число потоков для разбора очередей (см. workflow).

Если время ответа прикладной логики стабильно невысокое и клиент работает в режиме подключение на запрос, то использование очередей может не дать профита, а только увеличит нагрузку на CPU. Для постоянных подключений лучше использовать очередь. Если профиль нагрузки смешанный, то можно сделать два объекта сервера на разных портах, и по разному сконфигурировать последовательность обработки.

Конфигурация:

{
  "server-tcp": [
    {
      "name": "server-tcp1",
      /* В отключенном режиме сервер не создается*/
      "enabled": true,
      /* В режиме suspend работает как эхо-сервер */
      "suspend": false,
      /* Приоритет запуска. Укажите минимальный (максимальное значение), если порты необходимо открыть
         после запуска остальных объектов */
      "startup_priority": 0,
      /* Приоритет остановки. Укажите максимальный (минимальное значение), если порты необходимо закрыть 
         в первую очередь */
      "shutdown_priority": 0,

      /* Номера ядер CPU для потоков сервера, например [1,2,3] */
      "cpu": [],
      
      /* Цель - куда передать полученные данные */
      "target": "",
      
      /* По умолчанию использовать разделитель `\r\n` для сообщений. 
         Если false, то настраивается в connection */
      "rn": true,
      
      /* Не закрывать соединение после отправки ответа */
      "keep_alive": true,
      
      /* Число потоков. 0-в основном потоке приложения, 1-в отдельном потоке, N-акцептор на поток*/
      "threads": 0,
      
      /* Адрес */
      "addr": "",
      /* Порт */
      "port": "",
      /* Системная очередь новых соединений */
      "backlog": 1024,
      /* Максимальное количество одновременных подключения. 0 - без ограничения */
      "max_connections": 0,
      
      /* Настройки соединения */
      "connection": {
        /* В режиме direct_mode=true отключается регистрация нового соединения в целевом объекте.
           Работает быстрее, но не все целевые объекты поддерживают такой режим, например очередь 
           c tracking=true () */
        "direct_mode": false,
        
        /* Размер буфера приема сообщений сокета (SOL_SOCKET/SO_RCVBUF). 0 - не изменять */
        "receive_buffer_size": 0,
        
        /* Размер буфера отправки сокета (SOL_SOCKET/SO_SNDBUF). 0 - не изменять */ */
        "send_buffer_size": 0,
        
        "reader": {
          /* Разделитель входящих сообщений. Если задан, то поле rn-игнорируется и используется
             это значение. Если не задан (="") и если rn=true, то это эквивалентно sep="\r\n".
             Если не задан (="") и если rn=false, то без разделения, для двоичных данных. */
          "sep": "",
          /* Рекомендуемый размер приемного буфера (в бинарном режиме фиксированный)*/
          "bufsize": 4096,
          /* Максимальный рекомендуемый размер приемного буфера */
          "maxbuf": 8192,
          /* Минимальный рекомендуемый размер приемного буфера */
          "minbuf": 0,
          /* Максимальный размер входящего сообщения если указан разделитель. 0-без ограничений.*/
          "maxsize": 0,
          /* Удалять разделитель из */
          "trimsep": true
        },
        
        "writer": {
          /* Разделитель исходящих сообщений. Просто добавляет его в конец сообщения.
             Если задан, то поле rn-игнорируется и используется это значение.
             Если не задан (="") и если rn=true, то это эквивалентно sep="\r\n".
          */
          "sep": "",
          /* Размер исходящего буфера. Если сообщение превышает maxbuf, то оно разбивается на 
             несколько буферов размеров bufsize, для оптимизации отправки */
          "bufsize": 8192,
          /* Максимальный размер буфера, до его разбиения */
          "maxbuf": 8192,
          /* Минимальный размер буфера. Если текущий меньше этого значения, то он объединится 
             со следующим*/
          "minbuf": 0,
          /* Максимальный размер исходящего сообщения. Если 0, то предполагается равным maxbuf */
          "maxsize": 0,
          /* Попытка отправить сообщение превышающее _maxbuf с первой попытки, а только потом 
             разбить оставшиеся. Если возможна отправка сообщений более 1МБ, то рекомендуется 
             установить first_as_is=false, чтобы гарантировано разбить большое сообщения, для максимально 
             эффективной записи в сокет */
          "first_as_is": true
        }
      }
    }
  ]
}

Значения по умолчанию для размеров буферов reader и writer подобраны оптимально, для любых размеров сообщений. Но вы можете уменьшить их, если основной поток из сообщений небольшого размера. Увеличивать размеры больше 64К не имеет смысла для любых размеров сообщений.

Когда клиент подключается к серверу, то тот создает новый объект подключения, о чем уведомляется целевой объект (target). После отключения клиента, объект уничтожается, о чем так же происходит уведомление. Эти уведомления можно отключить установив direct_mode=true, если целевые объекты их не обрабатывают или могут работать без них. Например очереди по этим уведомлениям выкидывают сообщения для клиентов, которые закрыли подключение. Если включить direct_mode=true, то такой возможности уже не будет, и сообщение будет обработано, даже если получатель ответа уже отключился. Однако, если отключений клиентов не происходит (как правило по таймауту), то можно немного снизить нагрузку на CPU включив direct_mode.

Для jsonrpc конфигурация включает как минимум три компонента:

  • server-tcp или server-udp
  • jsonrpc-сервис - компонент предоставляемый разработчиком, например hash-service. Он преобразует jsonrpc-сообщения в вызовы API прикладного объекта
  • Прикладной объект, где реализована вся логика, например hash.
┌─────────────┐  ┌───────────────┐  ┌───────┐
│ server-tcp  ├──┤ hash-service  ├──┤ hash  │
└─────────────┘  └───────────────┘  └───────┘
{
  "server-tcp": [
    {
      "name": "server-tcp1",
      "threads": 1,
      "target": "hash-service1",
      "addr": "0.0.0.0",
      "port": "12346"
    }
  ],
  "hash-service": [
    {
      "name": "hash-service1",
      "target": "hash1"
    }
  ],
  "hash": [
    {
      "name": "hash1"
    }
  ]
}

server-udp

Для UDP сервера настроек меньше, но оставшиеся совпадают по описанию с TCP, но нужно иметь ввиду, что это датаграмный протокол, и разбиение на сообщения происходит только внутри одной датаграмы.

{
  "server-udp": [
    {
      "name": "server-udp1",
      "enabled": true,
      "suspend": false,
      "startup_priority": 0,
      "shutdown_priority": 0,
      "cpu": [],
      "target": "",
      "rn": true,
      "threads": 0,
      "addr": "",
      "port": "",
      "receive_buffer_size": 0,
      "send_buffer_size": 0,
      "reader": {
        "sep": "",
        "bufsize": 4096,
        "maxbuf": 8192,
        "minbuf": 0,
        "maxsize": 0,
        "trimsep": true
      },
      "writer": {
        "sep": "",
        "bufsize": 8192,
        "maxbuf": 8192,
        "minbuf": 0,
        "maxsize": 0,
        "first_as_is": true
      }
    }
  ]
}

Важно! Значения по умолчанию для reader/writer для датаграм 4КБ, возможно вам будет необходимо увеличить bufsize до максимального возможного размера датаграм 64КБ. При высоких нагрузках рекомендуется также задать receive_buffer_size, например:

"server-udp": [
  {
    "name": "server-udp1",
    "startup_priority":  1000,
    "shutdown_priority": -1000,
    "addr": "0.0.0.0",
    "port": "38000",
    "target": "service1",
    "reader": { "bufsize": 65535},
    "writer": { "bufsize": 65535},
    "receive_buffer_size": 128000000,
    "threads": 16
  }
]

Обычно UDP-порт открывают одновременно с TCP, например:

┌─────────────┐                           
│ server-tcp  ├─┐                         
└─────────────┘ │   ┌───────────────┐   ┌───────┐
                ├───┤ hash-service  ├───┤ hash  │
┌─────────────┐ │   └───────────────┘   └───────┘
│ server-udp  ├─┘
└─────────────┘
{
  "server-tcp": [
    {
      "name": "server-tcp1",
      "threads": 1,
      "target": "hash-service1",
      "addr": "0.0.0.0",
      "port": "12346"
    }
  ],
  "server-udp": [
    {
      "name": "server-udp1",
      "threads": 1,
      "target": "hash-service1",
      "addr": "0.0.0.0",
      "port": "12346"
    }
  ],
  "hash-service": [
    {
      "name": "hash-service1",
      "target": "hash1"
    }
  ],
  "hash": [
    {
      "name": "hash1"
    }
  ]
}

io-queue

Все настройки связанные с управлением очередями осуществляются в компоненте workflow, имя которого необходимо указать в поле "workflow". Таким образом несколько очередей могут обрабатываться одним пулом потоков. Обратную очередь имеет смысл включать если ответные сообщения могут иметь большой размер. Поле tracking позволяет включить режим отслеживания отключения клиента,

{
  "io-queue": [
    {
      "name": "io-queue1",
      "enabled": true,
      "suspend": false,
      "tracking": false,
      "workflow": "",
      "target": "",
      "callback_queue": false,
      "callback_workflow": ""
    }
  ]
}

Если у вас система настроена на работу с jsonrpc, то использовать этот компонент не оптимально, jsonrpc-queue больше для этого подходит. Пример для общей очереди:

┌─────────────┐
│ server-tcp  ├─┐
└─────────────┘ │     ┌──────────┐    ┌───────────────┐   ┌───────┐
                ├─────┤ io-queue ├────┤ hash-service  ├───┤ hash  │
┌─────────────┐ │     └──────────┘    └───────────────┘   └───────┘
│ server-udp  ├─┘   common workflow
└─────────────┘
{
  "server-tcp": [
    {
      "name": "server-tcp1",
      "target": "queue1",
      "addr": "0.0.0.0",
      "port": "12346"
    }
  ],
  "server-udp": [
    {
      "name": "server-udp1",
      "target": "queue1",
      "addr": "0.0.0.0",
      "port": "12346"
    }
  ],
  "io-queue": [
    {
      "name": "queue1",
      "target": "hash-service1"
    }
  ],
  "hash-service": [
    {
      "name": "hash-service1",
      "target": "hash1"
    }
  ],
  "hash": [
    {
      "name": "hash1"
    }
  ]
}

Еще пример:

                  workflow1
┌─────────────┐  ┌──────────┐
│ server-tcp  ├──┤ io-queue ├─┐
└─────────────┘  └──────────┘ │  ┌───────────────┐   ┌───────┐
                              ├──┤ hash-service  ├───┤ hash  │
┌─────────────┐  ┌──────────┐ │  └───────────────┘   └───────┘
│ server-udp  ├──┤ io-queue ├─┘
└─────────────┘  └──────────┘
                  workflow2
{
  "server-tcp": [
    {
      "name": "server-tcp1",
      "target": "queue1",
      "addr": "0.0.0.0",
      "port": "12346",
      "threads": 1
    }
  ],
  "server-udp": [
    {
      "name": "server-udp1",
      "target": "queue2",
      "addr": "0.0.0.0",
      "port": "12346",
      "threads": 1
    }
  ],
  "io-queue": [
    {
      "name": "queue1",
      "target": "hash-service1",
      "tracking": true,
      "workflow":"workflow1"
    },
    {
      "name": "queue2",
      "target": "hash-service1",
      "workflow":"workflow1"
    }
  ],
  "hash-service": [
    {
      "name": "hash-service1",
      "target": "hash1"
    }
  ],
  "hash": [
    {
      "name": "hash1"
    }
  ],

  "workflow": [
    {
      "name": "workflow1",
      "threads": 1,
      "maxsize": 10
    },
    {
      "name": "workflow2",
      "threads": 5
    },
  ]
}

io-broker

Позволяет управлять входным потоком с помощью регулярных выражений. Для jsonrpc лучше подойдет jsonrpc-broker

{
  "io-broker": [
    {
      "name": "io-broker1",
      "enabled": true,
      /* В режиме suspend отправляет поток в **target**  */
      "suspend": false,
      /* Цель по умолчанию (если пустая, то сообщение выкидывается из очереди ) */
      "target": "",
      /* Имя лога (не путь), если хотите чтобы сообщение писалось еще и туда */
      "target_log": "",
      "rules": [
        {
          /* Цель для правила */
          "target": "",
          /* Имя лога (не путь), если хотите чтобы сообщение писалось еще и туда, для этой цели */
          "rule_log": "",
          /* Регулярное выражение */
          "regex": ""
        }
      ]
    }
  ]
}

Пример:

{
  "io-broker": [
    {
      "name": "io-broker1",
      "rules": [
        {
          "target": "target1",
          "rule_log": "rule1",
          "regex": ".*"
        }
      ]
    }
  ]
}

Пример когда jsonrpc-запросы и остальные распределяются по разным очередям.

                                workflow1
┌─────────────┐                ┌──────────┐non jsonrpc
│ server-tcp  ├─┐            ┌─┤ io-queue ├─┐
└─────────────┘ │ ┌────────┐ │ └──────────┘ │  ┌──────────────┐   ┌──────┐
                ├─┤ broker ├─┤              ├──┤ hash-service ├───┤ hash │
┌─────────────┐ │ └────────┘ │ ┌──────────┐ │  └──────────────┘   └──────┘
│ server-udp  ├─┘            └─┤ io-queue ├─┘
└─────────────┘                └──────────┘jsonrpc
                                workflow2
{
  "server-tcp": [
    {
      "name": "server-tcp1",
      "target": "broker1",
      "addr": "0.0.0.0",
      "port": "12346",
      "threads": 1
    }
  ],
  "server-udp": [
    {
      "name": "server-udp1",
      "target": "broker1",
      "addr": "0.0.0.0",
      "port": "12346",
      "threads": 1
    }
  ],
  "io-broker": [
    {
      "name": "broker1",
      "target":"queue1",
      "log":"BRLOG1",
      "rules": [
        {
          "target":"queue2",
          "regex":"{.*}$",
          "log":"BRLOG2"
        }
      ]
    }
  ],
  "io-queue": [
    {
      "name": "queue1",
      "target": "hash-service1",
      "tracking": true,
      "workflow":"workflow1"
    },
    {
      "name": "queue2",
      "target": "hash-service1",
      "workflow":"workflow2"
    }
  ],
  "hash-service": [
    {
      "name": "hash-service1",
      "target": "hash1",
      "allow_non_jsonrpc":true
    }
  ],
  "hash": [
    {
      "name": "hash1"
    }
  ],
  "workflow": [
    {
      "name": "workflow1",
      "threads": 1,
      "maxsize": 10
    },
    {
      "name": "workflow2",
      "threads": 5
    },
  ]
}

client-tcp1

Устанавливает постоянное подключение с другим сервером для каждого потребителя (прикладной объект).

{
  "client-tcp": [
    {
      "name": "client-tcp1",
      "enabled": true,
      "suspend": false,
      "startup_priority": 0,
      "shutdown_priority": 0,
      /* Workflow для таймеров повторного подключения */
      "workflow": "",
      "rn": true,
      /* Завершить приложения при ошибках сети */
      "abort_if_error": true,
      /* Пул клиентов при старте приложения */
      "startup_pool": 1,
      /* Постоянный пул клиентов */
      "primary_pool": 0,
      /* Вторичный пул клиентов */
      "secondary_pool": 0,
      "addr": "",
      "port": "",
      /* Количество подключений на клиент */
      "connect_count": 1,
      /* Потоков на клиент */
      "threads": 0,
      /* Асинхронное подключение */
      "async_connect": false,
      /* Тамаут повторного подключения в миллисекундах */
      "reconnect_timeout_ms": 1000,
      
      /* Настройки аналогично TCP серверу */
      "connection": {
        "receive_buffer_size": 0,
        "send_buffer_size": 0,
        "reader": {
          "sep": "",
          "bufsize": 4096,
          "maxbuf": 8192,
          "minbuf": 0,
          "maxsize": 0,
          "trimsep": true
        },
        "writer": {
          "sep": "",
          "bufsize": 8192,
          "maxbuf": 8192,
          "minbuf": 0,
          "maxsize": 0,
          "first_as_is": true
        }
      }
    }
  ]
}

Стандартная схема, на примере workflow где прикладной объект demo удаленно, по jsonrpc, обращается к объекту hash:

┌───────────┐  ┌──────────────┐
│server-tcp ├──┤ demo-service ├┐
└───────────┘  └──────────────┘│
 ┌─────────────────────────────┘
 │┌───────┐  ┌──────────────┐  ┌─────────────┐    |    ┌─────────────┐  ┌──────────────┐  ┌───────┐
 └┤ demo  ├──┤ hash-gateway ├──┤ client-tcp  ├ - -|- ->┤ server-tcp  ├──┤ hash-service ├──┤ hash  │
  └───────┘  └──────────────┘  └─────────────┘    |    └─────────────┘  └──────────────┘  └───────┘

Для такой схемы, настроек пула подключений (startup_pool, primary_pool, secondary_pool) по умолчанию вполне достаточно, так как к клиенту подключается один экземпляр объекта. Клиент может сделать несколько постоянных подключений (connect_count=N*) и отправлять исходящие сообщения по очереди по каждому из них (эффективность такого решения сильно зависит от архитектуры удаленного сервера)

Если объектов использующих client-tcp счетное количество, то достаточно указать это число startup_pool=X, тогда при старте будет создано X экземпляров клиентов, с connect_count=N* подключений и threads * X потоков обработки. (на каждый экземпляр создается свой пул потоков)

Но также появилась возможность подключать server-tcp напрямую к клиенту или через io-объкты (io-queue, io-broker). В этом случае для каждого подключения сервера создается свой клиент. Для того, чтобы не блокировать сервер блокируемым подключением, сделаете его неблокируемым (async_connect=true), и настройте пул клиентов:

  • startup_pool - пул который будет заполнен при старте, и из него будут извлекаться подключенные объекты, до тех пор пока он не опустеет. Далее не используется.
  • primary_pool - пул постоянно поддерживает заданное количество подключенных объектов, при извлечении автоматически в фоне создает новое. Из этого пула извлекаются объекты в последнюю очередь.
  • secondary_pool - изначально пустой пул в который возвращаются отработанные объекты. Сначала извлекаются объекты из startup_pool, потом отсюда и только потом primary_pool. Отработанные клиенты из primary_pool также помещаются сюда

About

Network package for WFC (wamba framework cpp)

Resources

License

Stars

Watchers

Forks

Packages

No packages published