Subject: unsolicited data is not queued to the buffer. Am i reading a wrong stream ?

unsolicited data is not queued to the buffer. Am i reading a wrong stream ?

From: Sten Kultakangas via libssh2-devel <libssh2-devel_at_cool.haxx.se>
Date: Sat, 1 Jul 2017 19:51:46 +0300

Hello

I am using libssh2 with libevent in my application and i ran into the
following issue.

I do not receive all data that is supposed to come through the terminal.
Some of that data can be noticed in libssh2 debug output:

=> libssh2_transport_read() plain (158 bytes)
0000: 5E 00 00 00 00 00 00 00 95 0A 0D 0A 0D 53 79 73 : ^............Sys
0010: 74 65 6D 3A 20 4C 61 72 67 65 20 20 20 20 20 20 : tem: Large
0020: 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 :
0030: 20 20 53 6F 66 74 77 61 72 65 20 56 65 72 73 69 : Software Versi
0040: 6F 6E 3A 20 52 30 31 36 78 2E 30 33 2E 30 2E 31 : on: R016x.03.0.1
0050: 32 34 2E 30 1B 5B 42 0A 0D 54 65 72 6D 69 6E 61 : 24.0.[B..Termina
0060: 6C 20 54 79 70 65 20 28 35 31 33 2C 20 37 31 35 : l Type (513, 715
0070: 2C 20 34 34 31 30 2C 20 34 34 32 35 2C 20 56 54 : , 4410, 4425, VT
0080: 32 32 30 2C 20 4E 54 54 2C 20 57 32 4B 54 54 2C : 220, NTT, W2KTT,
0090: 20 53 55 4E 54 29 3A 20 5B 35 31 33 5D 20 : SUNT): [513]
[libssh2] 3.686087 Transport: Packet type 94 received, length=158

Only terminal sequence c [1;24r [1;1H [0J followed by "System: Large
                 Software Version: R016x.03.0.124.0 [B" was delivered to my
application,

My application calls libssh2_channel_read_ex until evbuffer_remove returns
nothing. This approach worked in all my network application. I tried
streams 0 and 1. When i am reading stream 1, the applications does not
receive any data from sshd.

I opened normal ssh client and logged in to the system without choosing the
terminal type. Then i opened another ssh client process and logged to the
system, choiced the terminal type and issued the "wall" command. The
contents of the broadcast message was displayed in both instances of the
normal ssh client.

However, the application does not receive that data even according to
libssh2_transport_read
debug output.

So there are at least two questions:
1) why only "System: Large Software Version:
R016x.03.0.124.0 [B" was delivered to my application, but not "Software
Version: R016x.03.0.124.0.[B..Terminal Type (513, 715, 4410, 4425, VT220,
NTT, W2KTT, SUNT): [513]" ? The latter was also present in libssh2 debug
output.
2) why unsolicited "wall" messages are not delivered neither through
standard stream nor through stderr stream ? Is there any special stream
that i'm missing? How to get the list of all available streams ?

Last time when i ran the application i got the following output:

16:34:57.724 BCMSMonitor::OnReadLine: Data: System: Large
 Software Version: R016x.03.0.124.0←[B
16:34:57.724 BCMSMonitor::OnReadLine: Data:
16:34:57.724 GenericClient::SSH_read_channel: Calling
libssh2_channel_read_ex() with 8123
[libssh2] 3.725187 Conn: channel_read() wants 8123 bytes from channel 0/0
stream #0
16:34:57.725 GenericClient::SSH_recv: read operation result: 0, wanted
16384 bytes

which indicates that "recv" notifications on the socket were re-enabled.
However, sshd server is still not sending unsolicited data to my
application although it is is sending it to normal ssh client.

Here's the relevant code:

ssize_t GenericClient::SSH_recv(
libssh2_socket_t fd,
void *buf,
size_t len,
int,
void **)
{
auto ctx = reinterpret_cast<GenericClient *>(fd);

Scope;
if(!ctx->ssh_input) {
Trace << "someone wanted to read " << len << " bytes too early";
return -EAGAIN;
}
auto rc = evbuffer_remove(ctx->ssh_input, buf, len);
Trace << "read operation result: " << rc << ", wanted " << len << " bytes";
if(rc < 0) return -EAGAIN;
if(rc == 0 && len != 0) return -EAGAIN;
return rc;
}

ssize_t GenericClient::SSH_send(
libssh2_socket_t fd,
const void *buf,
size_t len,
int,
void **)
{
auto ctx = reinterpret_cast<GenericClient *>(fd);
if(bufferevent_write(ctx->bev, buf, len) == 0) return len;
return -1;
}

bool GenericClient::Reconnect()
{
Scope;
Disconnect();

switch(security) {

case Security::none:
case Security::ssh:
bev = bufferevent_socket_new(
service->base,
-1,
BEV_OPT_CLOSE_ON_FREE);
if(security == Security::none) break;

ssh = libssh2_session_init();
if(!ssh) {
Fatal << GetName() << ": libssh2_session_init failed";
return false;
}
libssh2_session_set_blocking(ssh, 0);
libssh2_session_callback_set(ssh, LIBSSH2_CALLBACK_SEND, (void *)SSH_send);
libssh2_session_callback_set(ssh, LIBSSH2_CALLBACK_RECV, (void *)SSH_recv);
libssh2_trace(ssh, ~0);
break;

case Security::ssl:
//...

default:
Fatal << GetName() << ": invalid security specifier";
return false;
}

if(!bev) {
Error << GetName() << ": socket allocation failed";
return false;
}

bufferevent_setcb(bev, GetReader(), GetWriter(), GetEventHandler(), this);
bufferevent_enable(bev, EV_READ | EV_WRITE);

if(bufferevent_socket_connect_hostname(
bev,
service->dns_base,
AF_UNSPEC,
host.c_str(),
port) != 0) {

Error << GetName() << ": connection attempt cannot be initiated";
return false;
}

Trace << GetName() << ": connecting";
state = State::connecting;
return true;
}

bool
GenericClient::Connect(const string &new_destination, Security new_security)
{
destination = new_destination;
security = new_security;

auto colon = destination.find_last_of(':');
if(colon == string::npos) {
Scope;
Error << GetName() << ": destination port not specified";
return false;
}

host = destination.substr(0, colon);
port = strtoll(destination.substr(colon + 1).c_str(), 0, 10);
return Reconnect();
}

bool GenericClient::SSH_handshake()
{
Scope;
auto rc = libssh2_session_handshake(
ssh,
reinterpret_cast<libssh2_socket_t>(this)
);

if(rc != 0) {
if(rc == LIBSSH2_ERROR_EAGAIN) return true;
Error << GetName() << ": error " << rc;
Disconnect();
return OnConnectFailure();
}

// FIXME: check host fingerprint

state = State::getting_authentication_methods;
Trace << GetName() << ": getting authentication methods";
return SSH_get_authnetication_methods();
}

bool GenericClient::SSH_get_authnetication_methods()
{
Scope;
auto list = libssh2_userauth_list(ssh, username.c_str(), username.size());
if(list == 0) {
auto rc = libssh2_session_last_error(ssh, 0, 0, 0);
if(rc == LIBSSH2_ERROR_EAGAIN) return true;
Error << GetName() << ": error " << rc;
Disconnect();
return OnConnectFailure();
}
state = State::authenticating;
Trace << GetName() << ": authenticating using " << list;
return SSH_authenticate();
}

void GenericClient::SSH_authentication_response(
const char *name_str,
int name_len,
const char *instruction_str,
int instruction_len,
int num_prompts,
const struct _LIBSSH2_USERAUTH_KBDINT_PROMPT *prompts,
struct _LIBSSH2_USERAUTH_KBDINT_RESPONSE *responses,
void **abstract)
{
string name(name_str, name_len);
string instruction(instruction_str, instruction_len);

Scope;
Info << "Name: " << name;
Info << "Instruction: " << instruction;

for(auto i = 0; i < num_prompts; i++) {
string prompt(prompts[i].text, prompts[i].length);
Info << "Prompt: " << prompt;

responses[i].text = strdup("test");
responses[i].length = 9;
}
}

bool GenericClient::SSH_authenticate()
{
Scope;
auto rc = libssh2_userauth_keyboard_interactive_ex(
ssh,
username.c_str(),
username.size(),
SSH_authentication_response);

if(rc != 0) {
if(rc == LIBSSH2_ERROR_EAGAIN) return true;
Error << GetName() << ": error " << rc;
Disconnect();
return OnConnectFailure();
}

state = State::opening_channel;
Info << GetName() << ": authenticated";
return SSH_open_channel();
}

bool GenericClient::SSH_open_channel()
{
Scope;
channel = libssh2_channel_open_session(ssh);
if(!channel) {
auto rc = libssh2_session_last_error(ssh, 0, 0, 0);
if(rc == LIBSSH2_ERROR_EAGAIN) return true;
Error << GetName() << ": error " << rc;
Disconnect();
return OnConnectFailure();
}

//this does not help
//state = State::changing_channel_settings;
//Info << GetName() << ": changing channel settings";
//return SSH_change_channel_settings();

state = State::running_shell;
Info << GetName() << ": runninsh shell";
return SSH_run_shell();
}

bool GenericClient::SSH_change_channel_settings()
{
Scope;
auto rc = libssh2_channel_handle_extended_data2(
channel,
LIBSSH2_CHANNEL_EXTENDED_DATA_MERGE
);
if(rc != 0) {
if(rc == LIBSSH2_ERROR_EAGAIN) return true;
Error << GetName() << ": error " << rc;
Disconnect();
return OnConnectFailure();
}
state = State::running_shell;
Info << GetName() << ": runninsh shell";
return SSH_run_shell();
}

bool GenericClient::SSH_run_shell()
{
Scope;
auto rc = libssh2_channel_shell(channel);
if(rc != 0) {
if(rc == LIBSSH2_ERROR_EAGAIN) return true;
Error << GetName() << ": error " << rc;
Disconnect();
return OnConnectFailure();
}
state = State::established;
Info << GetName() << ": session established";
if(!OnConnect()) return false;
return SSH_read_channel();
}

bool GenericClient::SSH_read_channel()
{
Scope;
while(true) {
int len = GetRecvMaxSize() - recv_used;
char *start = &recv.front();
if(len <= 0) {
if(!truncated) {
truncated = true;
Error << GetName() << ": truncating message";
if(!OnReadLine(start, recv_used)) return false;
}

recv_used = 0;
len = GetRecvMaxSize();
}

Trace << "Calling libssh2_channel_read_ex() with " << len;

char *p = start + recv_used;
auto rc = libssh2_channel_read_ex(channel, 0, p, len);
if(rc <= 0) {
if(rc == 0 || LIBSSH2_ERROR_EAGAIN) return true;
Error << GetName() << ": the channel has been closed";
Disconnect();
return OnDisconnect();
}
len = rc;

char *p_bound = p + len;
recv_used += len;

while(p != p_bound) {
if(IsTerminatingSequence(p, p_bound - p)) {
if(truncated) truncated = false;
else if(!OnReadLine(start, p - start)) return false;

char *remaining = p + 1;
len = start + recv_used - remaining;
memcpy(start, remaining, len);
recv_used = len;
p = start;
p_bound = p + len;
continue;
}
p++;
}
}
}

void GenericClient::EventHandler(
struct bufferevent *bev,
short event,
void *data)
{
Scope;
bool still_active = true;
auto client = static_cast<GenericClient *>(data);

if(event & BEV_EVENT_CONNECTED) {
if(client->state == State::connecting) {

if(client->security == Security::ssh) {
client->state = State::handshaking;
Trace << client->GetName() << ": creating secure connection";
still_active = client->SSH_handshake();

} else {
client->state = State::established;
Info << client->GetName() << ": connection established";
still_active = client->OnConnect();
}

} else {

Fatal << client->GetName()
<< ": unexpected event " << HEX(event)
<< " in state " << (int)client->state;
still_active = false;
}

} else if(event & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if(client->state == State::connecting) {

Error << client->GetName() << ": connection failed";
client->Disconnect();
still_active = client->OnConnectFailure();

} else if(client->state == State::established) {

Info << client->GetName() << ": connection closed";
client->Disconnect();
still_active = client->OnDisconnect();

} else {

Fatal << client->GetName()
<< ": unexpected event " << HEX(event)
<< " in state " << (int)client->state;
still_active = false;
}

} else {
Fatal << client->GetName()
<< ": unexpected event " << HEX(event)
<< " in state " << (int)client->state;
still_active = false;
}

if(!still_active) client->service->UnregisterComponent(client);
}

bool GenericClient::Read(struct evbuffer *input)
{
if(security == Security::ssh) {
if(state == State::handshaking) {
ssh_input = input;
bool still_active = SSH_handshake();
ssh_input = 0;
return still_active;
}
if(state == State::getting_authentication_methods) {
ssh_input = input;
bool still_active = SSH_get_authnetication_methods();
ssh_input = 0;
return still_active;
}
if(state == State::authenticating) {
ssh_input = input;
bool still_active = SSH_authenticate();
ssh_input = 0;
return still_active;
}
if(state == State::opening_channel) {
ssh_input = input;
bool still_active = SSH_open_channel();
ssh_input = 0;
return still_active;
}
if(state == State::running_shell) {
ssh_input = input;
bool still_active = SSH_run_shell();
ssh_input = 0;
return still_active;
}
if(state == State::established) {
ssh_input = input;
bool still_active = SSH_read_channel();
ssh_input = 0;
return still_active;
}
}
        //....
}

void GenericClient::Reader(struct bufferevent *bev, void *data)
{
auto client = static_cast<GenericClient *>(data);
struct evbuffer *input = bufferevent_get_input(bev);

bool still_alive = client->Read(input);
if(!still_alive) client->service->UnregisterComponent(client);
}

Best regards,
Sten Kultakangas

_______________________________________________
libssh2-devel https://cool.haxx.se/cgi-bin/mailman/listinfo/libssh2-devel
Received on 2017-07-01