Subject: Libss2 in multithread environment

Libss2 in multithread environment

From: Babaladi, Prabhanand <Prabhanand_Babaladi_at_bmc.com>
Date: Wed, 21 Apr 2010 04:08:32 -0500

Hi,

I am trying to use the libssh2 library in multithread environment, but it is not working in my case. In short what I am doing is,

1. Creating session

2. In a multithread environment, each thread is sharing this session in synchronized manner and getting the channels.

3. Using channel which got above step execute the command and read output.

I am using libssh2 version 1.2.3 and build with VC++ 8 and code generation setting as "Multi-threaded (/MT)".

        At present it is not working correctly in my environment, It would be great help if you point out what's wrong in this approach or code.

Here is the code snapshot which I am trying. Let me know if you need more details.

/****************************************************************************************************************************************/
#include "testMain.h"
#include <stdio.h>
#include<conio.h>

#ifdef WIN32
 #include "libssh2/libssh2_config.h"
 #include <windows.h>
 #include <winsock2.h>
 #include <process.h>
#else
 #include <netinet/in.h>
 #include <sys/socket.h>
 #include <unistd.h>
 #include <arpa/inet.h>
 #include <netdb.h>
#endif

#include <sys/types.h>
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <ctype.h>
#include <sstream>
#include <iostream>
#include <string>

int m_set_non_blocking;
string m_hostid;
string m_userid;
string m_password;
bool m_isValid;

static const int ssh_reconnect_interval = 1; //seconds
static const int ssh_reconnect_timeout = 300; // seconds

LIBSSH2_SESSION *m_session;
HANDLE myMutex1;
HANDLE myMutex2;
HANDLE myMutex3;

int chanelRetcode = 1;
string str ;
std::string command = "ls -l"; //or command of your choice
string cmdError = "";
char* err;

int set_non_blocking = 0;
unsigned long hostaddr;
int sock, auth_pw = 0;
struct sockaddr_in sin;
const char *fingerprint;
char *userauthlist;

struct hostent *remoteHost;
struct in_addr addr;

static int waitsocket(int socket_fd, LIBSSH2_SESSION *session)
{
    struct timeval timeout;
    int rc;
    fd_set fd;
    fd_set *writefd = NULL;
    fd_set *readfd = NULL;
    int dir;

    timeout.tv_sec = 10;
    timeout.tv_usec = 0;

    FD_ZERO(&fd);

    FD_SET(socket_fd, &fd);

    /* now make sure we wait in the correct direction */
    dir = libssh2_session_block_directions(session);

    if(dir & LIBSSH2_SESSION_BLOCK_INBOUND)
        readfd = &fd;

    if(dir & LIBSSH2_SESSION_BLOCK_OUTBOUND)
        writefd = &fd;

    rc = select(socket_fd + 1, readfd, writefd, NULL, &timeout);
    return rc;
}

void testMain()
{

        m_set_non_blocking = 0;

        m_hostid = "hostid";
        m_userid = "userid";
        m_password = "password";

#ifdef WIN32
    WSADATA wsadata;
    WSAStartup(MAKEWORD(2, 0), &wsadata);
#endif

    if ( -1 == inet_addr(m_hostid.c_str()))
    {
        remoteHost = gethostbyname(m_hostid.c_str());
        if (!remoteHost)
        {
            printf("SSHConnection Failure due to Unreachable remote host");
        }
        else
        {
            int i = 0;
            while (remoteHost->h_addr_list[i] != 0)
            {
                addr.s_addr = *(u_long *) remoteHost->h_addr_list[i++];
            }
            hostaddr = inet_addr(inet_ntoa(addr));
        }
    }
    else
    {
        hostaddr = inet_addr(m_hostid.c_str());
    }

    /* Ultra basic "connect to port 22 on localhost"
     * Your code is responsible for creating the socket establishing the connection
     */
    sock = socket(AF_INET, SOCK_STREAM, 0);
#ifndef WIN32
    fcntl(sock, F_SETFL, 0);
#endif
    sin.sin_family = AF_INET;
    sin.sin_port = htons(22);
    sin.sin_addr.s_addr = hostaddr;
    if (connect(sock, (struct sockaddr*)(&sin),
                sizeof(struct sockaddr_in)) != 0)
    {
        m_isValid = false;
        printf("SSHConnection::SSHConnection failed to connect! ");
    }

    /* Create a session instance and start it up
     * This will trade welcome banners, exchange keys, and setup crypto, compression, and MAC layers
     */
    m_session = libssh2_session_init();

        /* tell libssh2 we want it all done non-blocking */
    libssh2_session_set_blocking(m_session, 0);

    int returnCode = 1; // Non success value
        int max_retries = (int) ssh_reconnect_timeout/ssh_reconnect_interval;

        int rc;
        while ((rc = libssh2_session_startup(m_session, sock)) == LIBSSH2_ERROR_EAGAIN);
    if (rc) {
        printf("Failure establishing SSH session: %d\n", rc);
        return;
    }
    //libssh2_session_startup(m_session, sock);
        returnCode = rc;
        if ( returnCode == LIBSSH2_ERROR_NONE)
        {
                        printf("SSHConnection::SSHConnection SSH connection established successfully" );
        }
        else if ( returnCode == LIBSSH2_ERROR_BANNER_NONE)
        {
                        // The server is not accepting any more SSH connection.
                        // Sleep for configured amount of time and then retry
                        int attempts = 0;
                        while(returnCode != LIBSSH2_ERROR_NONE &&
                                                        returnCode != LIBSSH2_ERROR_BANNER_NONE &&
                                                        attempts < max_retries)
                        {
#ifdef WIN32
                                        Sleep (ssh_reconnect_interval * 1000); // The input is expected in msec
#else
                                        sleep (ssh_reconnect_interval);
#endif
                                        libssh2_session_startup(m_session, sock);
                                        //returnCode = check_ssh_error(m_session, "libssh2_session_startup - 2");
                                        attempts++;
                                        if (returnCode == LIBSSH2_ERROR_NONE)
                                        {
                                                        printf( "SSHConnection::SSHConnection SSH connection established successfully on attempt number %d", attempts );
                                        }
                        }
                        // You come out of while when
                        // 1. You got the connection sucessfully
                        // 2. encountered error other than LIBSSH2_ERROR_BANNER_NONE
                        // In these cases report the specific error and exit.
                        if (returnCode != LIBSSH2_ERROR_NONE && returnCode != LIBSSH2_ERROR_BANNER_NONE)
                        {
                                        //check_ssh_error(m_session, "libssh2_session_startup - 3");
                                        m_isValid = false;
                                        ostringstream oss;
                                        oss << "SSHConnection Failure establishing SSH session after " << attempts << " attempts";
                                        //printf(oss.str());
                        }
        }
        else
        {
                        //check_ssh_error(m_session, "libssh2_session_startup - 4");
                        m_isValid = false;
                        printf("SSHConnection Failure establishing SSH session");
        }

    /* At this point we havn't authenticated,
    * The first thing to do is check the hostkey's fingerprint against our known hosts
    * Your app may have it hard coded, may go to a file, may present it to the user, that's your call
    */

    fingerprint = libssh2_hostkey_hash(m_session, LIBSSH2_HOSTKEY_HASH_MD5);

                        while ((rc = libssh2_userauth_password(m_session, m_userid.c_str(), m_password.c_str())) == LIBSSH2_ERROR_EAGAIN);
        if (rc) {
            printf("Authentication by password failed.\n");
            return;
        }

    printf( "SSHConnection::SSHConnection : connection Create success" );
    m_isValid = true;
    return ;

}
string readStdout(LIBSSH2_CHANNEL *channel)
{

    char *buf;
    int test_err;
    int timeout = 100;
    int nfds = 1;
    LIBSSH2_POLLFD *fds;
    string cmdout = "";

    size_t bufsiz = 8193, ret = 0;

    if ((buf = (char *) malloc (bufsiz)) == 0)
        return cmdout;

    test_err = 1;
    if (m_set_non_blocking == 1)
    {
        /* poll for stdout */
        if (( fds = (struct _LIBSSH2_POLLFD *) malloc (sizeof (LIBSSH2_POLLFD) * 2)) == 0)
            return cmdout;
        fds[0].type = LIBSSH2_POLLFD_CHANNEL;
        fds[0].fd.channel = channel;
        fds[0].events = LIBSSH2_POLLFD_POLLIN;
        fds[0].revents = LIBSSH2_POLLFD_POLLIN;

        test_err = libssh2_poll (fds, nfds, timeout);

        free (fds);
    }

    ret = 0;
    if (test_err > 0)
    {
        /* Get STDOUT */
        ret = libssh2_channel_read (channel, buf, bufsiz - 10);
    }
    if ((ret > 0) && (ret <= bufsiz))
    {
        cmdout = buf;
        cmdout = cmdout.substr(0, ret);
    }
    free (buf);

    return cmdout;
}

void createChannelNew()
{

    LIBSSH2_CHANNEL *channel = 0;

        WaitForSingleObject( myMutex1, INFINITE );
        printf("\nGOT CHANNEL----------------\n");

    /* Exec non-blocking on the remove host */
    while( (channel = libssh2_channel_open_session(m_session)) == NULL &&
           (libssh2_session_last_error(m_session,NULL,NULL,0) == LIBSSH2_ERROR_EAGAIN ||
                            libssh2_session_last_error(m_session,NULL,NULL,0) == LIBSSH2_ERROR_SOCKET_SEND) )
    {
                        printf("\n GOINT IN WAIT TO GET SESSION----------------\n");
        waitsocket(sock, m_session);
    }

        if( channel == NULL )
    {
        printf("Error in CHANNEL _________________________\n");

                        char* err = 0;
                        libssh2_session_last_error(m_session, &err, NULL, 0);
                        printf("\nOpen channel failed %s\n", err);
                        int dw = libssh2_session_last_errno(m_session);

                        if(LIBSSH2_ERROR_EAGAIN == dw){
                                        printf("\nOpen channel failed 1\n" );
                        }
                        if(LIBSSH2_ERROR_CHANNEL_FAILURE == dw){
                                        printf("\nOpen channel failed 2\n" );
                        }

                        if(LIBSSH2_ERROR_SOCKET_SEND == dw){
                                        printf("\nOpen channel failed 3\n" );
                        }

                        if(LIBSSH2_ERROR_ALLOC == dw){
                                        printf("\nOpen channel failed 4\n" );
                        }
        return;
    }
         ReleaseMutex( myMutex1 );

     libssh2_channel_handle_extended_data(channel, LIBSSH2_CHANNEL_EXTENDED_DATA_NORMAL);

                          int rc;
                          while( (rc = libssh2_channel_exec(channel, command.c_str())) == LIBSSH2_ERROR_EAGAIN )
                                        {
                                                        printf("\n GOINT IN WAIT TO GET CHANNEL----------------\n");
                                                        waitsocket(sock, m_session);
                                        }

          if( rc != 0 )
    {
        printf("Error in cmd execution _______________________%d\n", rc);
        return;
    }
          printf(" ****** cmd execution DONE %d\n", rc);

                        int bytecount = 0;
                        string cmdOutput ="";
                        for( ;; )
                        {
                                        /* loop until we block */
                                        int rc;
                                        do
                                        {
                                                        int bufsiz = 0x4000;
                                                        char buffer[0x4000];
                                                        rc = libssh2_channel_read( channel, buffer, sizeof(buffer) );

                                                        if ((rc > 0))
                                                        {
                                                                        string op = buffer;
                                                                        cmdOutput += op.substr(0, rc);
                                                        }
                                        }
                                        while( rc > 0 );

                                        /* this is due to blocking that would occur otherwise so we loop on
                                           this condition */

                                        char* err = 0;
                                        libssh2_session_last_error(m_session, &err, NULL, 0);

                                        if( rc == LIBSSH2_ERROR_EAGAIN )
                                        {
                                                        waitsocket(sock, m_session);
                                        }
                                        else{

                                                         printf(" ****** in read channel exit %d %s\n", rc, err);
                                                        break;
                                        }
                        }

                        printf("\nEXECUTED CMD CHANNEL----------------\n");

    printf("****************************************************************************************:\n");
        printf("OUTPUT:%s", cmdOutput.c_str());
        printf("****************************************************************************************:\n");

   // }

}

void myfun( void *arg )
{

        int *p = (int*)arg;
        printf("\nFrom thread %d \n",*p);
    createChannelNew();

}

void main()
{
    testMain();
    //createChannelNew();
        myMutex1 = CreateMutex( NULL, FALSE, NULL );
    printf("\nEnter main new Befire thread\n");
        int x =0;
    for(int i=0; i<10;i++){
            _beginthread( myfun, 0, (void*)&x );
                        }
    printf("\nEnter main new end \n");

    getch();
        CloseHandle( myMutex1 );

}

/****************************************************************************************************************************************/

Thanks,
-Prabhanand

_______________________________________________
libssh2-devel http://cool.haxx.se/cgi-bin/mailman/listinfo/libssh2-devel
Received on 2010-04-21