Куренто: постоянно генерируется странный лог.

Я работал над kurento последние пару месяцев, и сегодня внезапно я начал получать следующий журнал:

[0x00007f40b9f6d700]   debug KurentoMediaElementImpl   MediaElementImpl.cpp:660 disconnectAll() <kmswebrtcendpoint3>  Retry disconnect all <kmshubport3>

этот журнал создается непрерывно и вызывает 100% загрузку процессора с созданием нескольких файлов журнала объемом более 100 МБ.

Я также получил этот журнал в другом файле журнала

2016-04-15 20:05:47,914497 23751 [0x00007fd903de1700]   error KurentoWorkerPool         WorkerPool.cpp:41 workerThreadLoop()  Unexpected error while running the server: boost::filesystem::last_write_time: No such file or directory: "/var/log/kurento-media-server/media-server_2016-04-15_20-05-18.00049.pid23751.log"
2016-04-15 20:05:47,914513 23751 [0x00007fd90f7fe700]   debug KurentoMediaSet           MediaSet.cpp:470 async_delete()  Destroying HubPort -> d1975230-7fcb-498e-8d4a-bfc7afbf8bf2_kurento.MediaPipeline/528783df-ac14-47bc-a7ad-8044721563b9_kurento.HubPort
2016-04-15 20:05:47,914593 23751 [0x00007fd903de1700]   debug KurentoWorkerPool         WorkerPool.cpp:37 workerThreadLoop()  Working thread starting
2016-04-15 20:05:47,917583 23751 [0x00007fd903de1700]   debug KurentoMediaSet           MediaSet.cpp:470 async_delete()  Destroying Composite -> d1975230-7fcb-498e-8d4a-bfc7afbf8bf2_kurento.MediaPipeline/63c84bc4-0d42-4299-ae22-219c0a603837_kurento.Composite
2016-04-15 20:06:10,067442 23751 [0x00007fd9cbfff700]   debug KurentoMediaSet           MediaSet.cpp:131 doGarbageCollection()  Running garbage collector
2016-04-15 20:10:10,067626 23751 [0x00007fd9cbfff700]   debug KurentoMediaSet           MediaSet.cpp:131 doGarbageCollection()  Running garbage collector
2016-04-15 20:14:10,067789 23751 [0x00007fd9cbfff700]   debug KurentoMediaSet           MediaSet.cpp:131 doGarbageCollection()  Running garbage collector

Примечание:

Я использую пример Java с настроенным групповым вызовом, и я написал фильтр kms, в котором я использовал фильтр vadfilter, чтобы получить звуковой буфер без звука, скопировать его в объект gst_buffer до тех пор, пока не будет обнаружено молчание, а затем отправить его в веб-сервис, используя curl_easy_perform.

также, когда я использую sudo netstat -np | grep "CLOSE_WAIT"

Я вижу, что машина, которую я использую для группового звонка, и IP-адрес веб-сервиса указаны в состоянии CLOSE_WAIT, но я не уверен, находятся ли они в состоянии CLOSE_WAIT из-за сбоя в моем плагине или чего-то еще.

P.s: я использовал gst_buffer_copy_deep() gst_buffer_get_memory gst_buffer_append_memory для копирования последующих аудиобуферов до обнаружения тишины.

Будет очень полезно, если кто-то может указать что-нибудь, связанное с ошибками, с которыми я сталкиваюсь.

И это еще не все... Я введите здесь описание ссылки, на которое вы можете ссылаться, чтобы увидеть ошибки, которые я получаю. Спасибо.

Редактировать 1:

namespace kurento
{
  namespace module
  {
    namespace vadcustomfilter
    {
      void VADCustomFilterImpl::newBufferHandler (GstBuffer * buffer)
      {
        try
        {
            //instead of doing audio_buffer=null we set a flag audio_buffer_null to TRUE when we want to make a new copy of audio_buffer
            if (!audio_buffer_null )
            {
                // dest, source, flag, offset, size
                //gst_buffer_copy_into (audio_buffer, buffer, GST_BUFFER_COPY_MEMORY, 0, -1);
                GstMemory *inbuf_memory = gst_buffer_get_memory(buffer,0);
                gst_buffer_append_memory(audio_buffer,inbuf_memory);
            }
            else
            {
              audio_buffer = gst_buffer_new ();
              audio_buffer = gst_buffer_copy_deep (buffer);
              audio_buffer_null = FALSE;
            }
        }
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR ("EXCEPTION: new buffer");
        }
      }


      void VADCustomFilterImpl::busMessage (GstMessage * message)
      {
        try
        {
            switch (message->type)
            {
                case GST_MESSAGE_EOS:
                {
                    GST_WARNING ("---------------> GST_MESSAGE_EOS");
                }
                break;

                case GST_MESSAGE_ELEMENT:
                {
                    const GstStructure *structure = gst_message_get_structure (message);
                    if (structure == NULL) return;

                    const gchar *name = gst_structure_get_name (structure);
                    if (name == NULL || strlen(name) == 0) return;


                    if (strcmp (name, "vadfilter") == 0)
                    {
                        //messages are voice , silence, max_voice_reached
                        const GValue *msg_value = NULL;
                        gint64 silence_buffer = 0;

                        gst_structure_get_int64(structure, "silence", &silence_buffer);
                        silence_buffer /= sizeof(gint16);

                        //silence_buffer is the size of buffer which was emitted in chunks before silence was detected
                        //use to check for min voice size

                        if (silence_buffer > 0)
                        {
                            //min_voice_buffer_size set from java server to set a limit of min buffer size which can be sent to the web service
                            if (audio_buffer != NULL && (silence_buffer > (gint64)min_voice_buffer_size))
                            {   
                                audio_buffer_copy = NULL;
                                audio_buffer_copy = gst_buffer_copy_deep(audio_buffer);

                                push_buffer_in_queue ();
                            }
                            return;
                        }

                        gint64 max_voice = 0;
                        gst_structure_get_int64 (structure, "max_voice_reached", &max_voice);

                        if (audio_buffer != NULL && (max_voice > 0))
                          {
                            audio_buffer_copy = NULL;
                            audio_buffer_copy = gst_buffer_copy_deep(audio_buffer);
                            push_buffer_in_queue ();
                          }
                        } /* vadfilter */
                    } /* message_element */
                    break;

                default:
                    break;

                } /* switch */
            }
          catch (std::bad_weak_ptr & e)
            {
              GST_ERROR("EXCEPTION: bus message");
            }
        } /* bus message */


      void VADCustomFilterImpl::push_buffer_in_queue ()
      {
        try 
        {
            GstMapInfo map;         
            if (audio_buffer_copy != NULL)
            {

                if (gst_buffer_map (audio_buffer_copy, &map, GST_MAP_READ))
                {
                    AUDIO_STREAM *pcm = new AUDIO_STREAM;
                    pcm->size = (unsigned int) map.size;
                    pcm->data = new unsigned char[pcm->size + 1];
                    std::memcpy (pcm->data, (unsigned char *) map.data, pcm->size);

                    gst_buffer_unmap (audio_buffer_copy, &map);

                    //set the flag for audio_buffer to create a new copy
                    audio_buffer_null = TRUE;
                    fetch_response(pcm);
                    if (pcm->data != NULL)
                    {               
                        pcm->size = 0;
                        pcm->data = NULL;
                    }

                    if (pcm != NULL)
                    {           
                        delete pcm;
                        pcm = NULL;
                    }               
                }
                else
                    GST_ERROR("push_buffer_in_queue: Invalid buffer");


            }   
        }
         catch (std::bad_weak_ptr & e)
        {
          GST_ERROR ("EXCEPTION::push_buffer_in_queue ");
        } 
      }

      VADCustomFilterImpl::VADCustomFilterImpl (const boost::property_tree::ptree &conf, 
                                                std::shared_ptr<MediaPipeline> mediaPipeline, 
                                                int minVoiceBufferSize, 
                                                int maxVoiceBufferSize, 
                                                int64_t minSilenceTime, 
                                                const std::string &url, 
                                                const std::string &language):FilterImpl
                                                (conf,
                                                 std::dynamic_pointer_cast < MediaObjectImpl > (mediaPipeline))
      {

        try
        {
            min_voice_buffer_size = (gsize) minVoiceBufferSize;
            curl_url = url ;   
            accept_language = (!language.empty()) ? ("Accept-Language: " + language) : ("Accept-Language: en-US");

            g_object_set (element, "filter-factory", "vadfilter", NULL);

            g_object_get(G_OBJECT(element), "filter", &vadfilter, NULL);

            if (vadfilter  == NULL)
              {
                throw KurentoException (MEDIA_OBJECT_NOT_AVAILABLE, "Media Object not available");
              }


            g_object_set(G_OBJECT(vadfilter), "minimum-silence-time", (guint64)minSilenceTime, NULL); //2000000000
            g_object_set(G_OBJECT(vadfilter), "max-voice-buffer-size", (guint)maxVoiceBufferSize, NULL);

            g_object_unref(vadfilter);

            g_async_queue_create ();

            curl_init ();
        }
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: constructor... ");
        } 
      }

      void VADCustomFilterImpl::postConstructor ()
      {
        try
        {
            GstBus *bus;

            std::shared_ptr < MediaPipelineImpl > pipe;

            FilterImpl::postConstructor ();

            pipe = std::dynamic_pointer_cast < MediaPipelineImpl >(getMediaPipeline ());

            bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline ()));

            /* register bus message handler */
            bus_handler_id = register_signal_handler (G_OBJECT (bus),
                                  "message",
                                  std::function <
                                  void (GstElement *,
                                    GstMessage *) >
                                  (std::
                                   bind
                                   (&VADCustomFilterImpl::
                                    busMessage, this,
                                    std::placeholders::_2)),
                                  std::dynamic_pointer_cast <
                                  VADCustomFilterImpl >
                                  (shared_from_this ()));

            /* register new_buffer handler */
            remove_silence_id = register_signal_handler(G_OBJECT(vadfilter),
                                     "buffer",
                                     std::function <
                                     void (GstElement *,
                                       GstBuffer *) >
                                     (std::
                                      bind
                                      (&VADCustomFilterImpl::
                                       newBufferHandler, this,
                                       std::placeholders::
                                       _2)),
                                     std::
                                     dynamic_pointer_cast <
                                     VADCustomFilterImpl >
                                     (shared_from_this ()));

            if (bus)
                g_object_unref (bus);   
        }
          catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: post constructor... ");
        } 
      }

      VADCustomFilterImpl::~VADCustomFilterImpl ()
      {
        std::shared_ptr < MediaPipelineImpl > pipe;

        try
        {           
            if (bus_handler_id > 0)
          {
            pipe = std::dynamic_pointer_cast < MediaPipelineImpl > (getMediaPipeline ());
            GstBus *bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline ()));
            if (bus)
            {
                unregister_signal_handler (bus, bus_handler_id);
                g_object_unref (bus);
            }

          }

          /*if (remove_silence_id > 0)
            {
              unregister_signal_handler (element, remove_silence_id);
            }
            */

          /* curl cleanup */
          curl_cleanup ();

          /* buffer queue cleanup */
            if (g_async_buffer_queue)
            {
                g_async_queue_unref (g_async_buffer_queue);
                 g_async_buffer_queue = NULL;
            }


          /* audio_buffer cleanup */
            if (audio_buffer_copy != NULL)
            {
                audio_buffer_copy = NULL;   
            }

            if (audio_buffer != NULL)
            {
                audio_buffer = NULL;
                audio_buffer_null = TRUE;
            }

        }
        catch (std::bad_weak_ptr & e)
        {
          remove_silence_id = -1;
          GST_ERROR("EXCEPTION: destructor ");
        }
      }


      size_t VADCustomFilterImpl::curl_callback (char *contents, size_t size,
                         size_t nmemb, void *userp)
      {
        size_t realsize = 0;  
        try
        {
            realsize = size * nmemb;    /* calculate buffer size */
            ((std::string *) userp)->append ((char *) contents, realsize);
        }
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: curl_callback");
        }
        return realsize;
      }

      int VADCustomFilterImpl::curl_init ()
      {
        /* init curl handle */
        if ((curl_handle = curl_easy_init ()) == NULL)
            return 1;


        /* set content type */
        headers = curl_slist_append (headers, "Accept: application/xml");

        headers = curl_slist_append (headers, "Transfer-Encoding: chunked");
        headers = curl_slist_append (headers, accept_language.c_str());     

        /* set curl options */
        curl_easy_setopt (curl_handle, CURLOPT_VERBOSE, 0);
        curl_easy_setopt (curl_handle, CURLOPT_POST, 1);
        curl_easy_setopt (curl_handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
        curl_easy_setopt (curl_handle, CURLOPT_HTTPHEADER, headers);
        curl_easy_setopt (curl_handle, CURLOPT_TIMEOUT, 10);          
        curl_easy_setopt (curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); /* set default user agent */                
        curl_easy_setopt (curl_handle, CURLOPT_URL, curl_url.c_str ()); /* set url to fetch */      

        return 0;
      }


      int VADCustomFilterImpl::curl_fetch (AUDIO_STREAM * pcm)
      {
        curl_op_in_progress = TRUE;

        if (curl_handle == NULL)
            return 1;

        CURLcode rcode;

        try
        {
            response.clear();
            curl_easy_setopt (curl_handle, CURLOPT_WRITEFUNCTION, curl_callback); /* set calback function */        
            curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &response);/* pass fetch struct pointer */
            curl_easy_setopt (curl_handle, CURLOPT_POSTFIELDSIZE, pcm->size);
            curl_easy_setopt (curl_handle, CURLOPT_POSTFIELDS,  pcm->data);

            rcode = curl_easy_perform(curl_handle);

            handle_curl_response(rcode);
        }   
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: curl_fetch ");
        }


        return 1;
      }

    void VADCustomFilterImpl::handle_curl_response(CURLcode rcode)
    {
        curl_op_in_progress = FALSE;
        switch (rcode)
        {
            case CURLE_OK:
            {

                if (!response.empty())
                {
                    SendresponseReceivedEvent(response);
                    response.clear();
                }
            }
                break;

            case CURLE_OPERATION_TIMEDOUT:
            case CURLE_HTTP_RETURNED_ERROR: /* HTTP Error, >= 400*/
            {
                GST_ERROR ("ERROR: handle_curl_response: (%s)", curl_easy_strerror (rcode));

                goto pop_pcm_from_queue;
            }
            break;

            default:
                GST_ERROR ("ERROR: Failed to fetch url (%s) - curl said: %s\n", curl_url.c_str (), curl_easy_strerror (rcode));
                break;
        } /* switch */



        pop_pcm_from_queue:
            if (g_async_queue_length (g_async_buffer_queue) > 0)
              {
                AUDIO_STREAM *pcm_popped = reinterpret_cast < AUDIO_STREAM * >(g_async_queue_pop (g_async_buffer_queue));
                curl_fetch (pcm_popped);

              }
            else
                GST_ERROR ("handle_curl_response: queue is empty, no pcm to pop out");
    }

    void VADCustomFilterImpl::curl_cleanup ()
    {
        if (curl_handle)
            curl_easy_cleanup (curl_handle);

        /* free headers */
        if (headers)
            curl_slist_free_all (headers);

        curl_url.clear ();
    }

    /* fetch the response from web service, if it is already fetching the push the current audio buffer in queue */
    void VADCustomFilterImpl::fetch_response(AUDIO_STREAM * pcm)
    {
    //check queue size if > 0 then push else hit curl
    if (!curl_op_in_progress)
      {
        curl_fetch (pcm);
      }
    else
      { 
        try 
        {           
            if (g_async_buffer_queue == NULL)
              g_async_queue_create ();

            g_async_queue_push (g_async_buffer_queue, pcm);
        }
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: curl_fetch ");
        }

      }
    }

      void VADCustomFilterImpl::g_async_queue_create ()
      {
        g_async_buffer_queue = g_async_queue_new ();
      }
      }         /* vadcustomfilter */
   }        /* module */
}       /* kurento */

person Sagar Pilkhwal    schedule 15.04.2016    source источник


Ответы (1)


Предоставляя информацию, которую вы предоставляете здесь и в этом другом связанном вопросе. Я почти уверен, что проблема в том, что ваш фильтр блокирует поток мультимедиа (вероятно, из-за того, что он отправляет http-запросы). Кроме того, все ошибки, связанные с буферами и мини-объектами, указывают на то, что вы делаете что-то неправильно в фильтре при обработке мультимедиа.

Вы должны просмотреть свой код и, возможно, проконсультироваться с экспертом, чтобы узнать источник проблемы. Без доступа к исходному коду было бы трудно просто угадать информацию, которую вы предоставляете. Даже с исходным кодом может быть трудно определить, что вызывает взаимоблокировку.

person santoscadenas    schedule 16.04.2016
comment
привет, я добавил исходный код, пожалуйста, проверьте eidt - person Sagar Pilkhwal; 18.04.2016