Using kqueue with 2 threads

Hi,

I'm trying to use a kqueue to listen for VNODE events on a worker thread. I want new events to be registered with the kqueue by a separate thread.

I have been referring to this example http://doc.geoffgarside.co.uk/kqueue/file.html

Here are the relevant parts of code as it is at the moment.

In my worker thread:

Code:
std::cerr << "Started worker" << std::endl;
    
    struct kevent ke;
    int i;
    
    while ( !mStopRequested ) {    

        memset(&ke, 0x00, sizeof(kevent));
        
        i = kevent(kq, NULL, 0, &ke, 1, NULL);
        if ( i == -1 ) {
            std::cerr << "kqueue produced error: " << strerror(i) << std::endl;
            continue; // todo is this the best thing todo?
        } 
        
        std::cerr << "Beep: " << i << std::endl;

    }
    
    std::cerr << "Shutting down worker" << std::endl;

Other thread

Code:
int fd = open(fileName.c_str(), O_RDONLY);
    if ( fd == -1 ) {
        std::cerr << "Failed to open file: " << fileName << " Error: " << strerror(errno) << std::endl;
        // todo throw exception
    }
    
    struct kevent ke;

    EV_SET(&ke, fd, EVFILT_VNODE, EV_ADD, NOTE_DELETE | NOTE_RENAME | NOTE_EXTEND, 0, NULL);

    if (kevent(kq, &ke, 1, NULL, 0, NULL) == -1) {
        std::cerr << "kevent produced error: " << strerror(errno) << std::endl;
    }


When I run my code the kevent call in the worker code doesn't block at all. Any ideas what I'm missing? or if what I want to do is even possible?

Ben
 
Altering the code as follows prints extended over and over although I'm only writing to the file once.

Code:
std::cerr << "Started worker" << std::endl;
    
    struct kevent ke;
    int i;
    
    while ( !mStopRequested ) {    

        memset(&ke, 0x00, sizeof(ke));
        
        i = kevent(kq, NULL, 0, &ke, 1, NULL);
        if ( i == -1 ) {
            std::cerr << "kqueue produced error: " << strerror(i) << std::endl;
            continue; // todo is this the best thing todo?
        } else if ( i == 0 ) {
            std::cerr << "kqueue time limit expired" << std::endl;
            continue;
        }
        
        if ( ke.fflags & NOTE_DELETE ) {
            std::cerr << "delete" << std::endl;            
        } else if ( ke.fflags & NOTE_RENAME ) {
            std::cerr << "rename" << std::endl;
        } else if ( ke.fflags & NOTE_EXTEND ) {
            std::cerr << "extended" << std::endl;
        } else {
            std::cerr << "some over event" << std::endl;
        }

        
        sleep(1);   

    }
    
    std::cerr << "Shutting down worker" << std::endl;
 
I have put together the following example and can still see the problem. Once the file is written to extended is printed every second. If I remove the sleep then it prints a whole lot more. So it seems that the call to kevent in the worker_thread method blocks until the file is written to then it returns the same event straight away on every call.

Is this expected? Am I meant to clear the event some how?

Code:
#include <cstdlib>

#include <iostream>

#include <sys/stat.h>
#include <sys/errno.h>
#include <iostream>
#include <fstream>
#include <cstring>
#include <stdio.h>

#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <stdio.h>
#include <fcntl.h>

using namespace std;

struct wrapper {
    int kq;    
};

static void* worker_thread(void* obj) {
    
    int kq = ((wrapper*)obj)->kq;
    
    std::cerr << "Started worker" << std::endl;
    
    struct kevent ke;
    int i;
    
    while ( 1 ) {    

        memset(&ke, 0x00, sizeof(ke));
        
        i = kevent(kq, NULL, 0, &ke, 1, NULL);
        if ( i == -1 ) {
            std::cerr << "kqueue produced error: " << strerror(i) << std::endl;
            continue; // todo is this the best thing todo?
        } else if ( i == 0 ) {
            std::cerr << "kqueue time limit expired" << std::endl;
            continue;
        }
        
        if ( ke.filter == EVFILT_VNODE ) {
        
            if ( ke.fflags & NOTE_DELETE ) {
                std::cerr << "delete" << std::endl; 
                
                break;
            } else if ( ke.fflags & NOTE_RENAME ) {
                std::cerr << "rename" << std::endl;
            } else if ( ke.fflags & NOTE_EXTEND ) {
                std::cerr << "extended" << std::endl;
            } else {
                std::cerr << "some other fflags" << std::endl;
            }
        
        } else {
            std::cerr << "some other filter" << std::endl;
        }
       
        
        sleep(1);   

    }
    
    std::cerr << "Shutting down worker" << std::endl;
    
    return 0;    
}

void writeToFile(std::string fileName, std::string key, std::string value) {
    
    std::fstream registryEntry;
    
    registryEntry.exceptions(std::ifstream::failbit | std::ifstream::badbit);
    
    try {
        
        registryEntry.open(fileName.c_str(), std::ios::out | std::ios::binary);
        
        size_t keyLength = key.length();
    
        registryEntry.write((char*)&keyLength, sizeof(size_t));

        registryEntry.write(key.c_str(), key.length()); 

        size_t valueLength = value.length();

        registryEntry.write((char*)&valueLength, sizeof(size_t));

        registryEntry.write(value.c_str(), value.length());
        
        std::cout << "Set entry with key: " << key << std::endl;
       
    } catch (std::ifstream::failure e) {
        // todo throw exception
        std::cerr << "Failed to set registry entry. Error: " << e.what() << std::endl;
    } 
    
    registryEntry.close();
    
}

void test2() {
    
    int kq = kqueue();
    
    if ( kq == -1 ) {
        std::cerr << "Failed to create kqueue. Error: " << strerror(errno) << std::endl;
        return;
    }
    
    pthread_t worker;
    
    wrapper wrap;
    wrap.kq = kq;
    
    
    if ( pthread_create(&worker, 0, worker_thread, &wrap) != 0 ) {
        std::cerr << "Failed to create listener thread. Error: " << strerror(errno) << std::endl;
    }    
    
    int fd = open("/tmp/tt", O_RDONLY);
    if ( fd == -1 ) {
        std::cerr << "Failed to open file: " << "/tmp/tt" << " Error: " << strerror(errno) << std::endl;
        // todo throw exception
    }
    
    struct kevent ke;

    EV_SET(&ke, fd, EVFILT_VNODE, EV_ADD, NOTE_DELETE | NOTE_RENAME | NOTE_EXTEND, 0, NULL);

    if (kevent(kq, &ke, 1, NULL, 0, NULL) == -1) {
        std::cerr << "kevent produced error: " << strerror(errno) << std::endl;
    }
    
    sleep(1);
    
    writeToFile("/tmp/tt", "key1", "value1");
    
    sleep(5);    
    
}

/*
 * 
 */
int main(int argc, char** argv) {
    
    test2();
    
    
    return 0;
}
 
By changing this line..

Code:
EV_SET(&ke, fd, EVFILT_VNODE, EV_ADD , NOTE_DELETE | NOTE_RENAME | NOTE_EXTEND, 0, NULL);

to

Code:
EV_SET(&ke, fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_DELETE | NOTE_RENAME | NOTE_EXTEND, 0, NULL);

Stops the multiple events.
 
Back
Top