Changeset 431 for branches/thune

Show
Ignore:
Timestamp:
07/16/07 03:30:29 (17 months ago)
Author:
krobillard
Message:

Atoms should now be thread safe. First thread experiment runs.

Location:
branches/thune/thread_safe
Files:
14 modified

Legend:

Unmodified
Added
Removed
  • branches/thune/thread_safe/config.t

    r426 r431  
    99[x] dataflow "Component! datatype for dataflow programming" 
    1010 
     11[ ] threads  "CPU Threads" 
     12[ ] emh      "Debugger Hooks" 
    1113[ ] dt-code  "Include 'code datatype" 
    1214[ ] uds      "Library with datatype & gc system only - no eval" 
    1315             disable [bzip2 trig math3d dt-code] 
    1416 
    15 [ ] threads  "Experimental script threads" 
    16 [ ] emh      "Debugger Hooks" 
    17  
    1817;eof 
  • branches/thune/thread_safe/context.c

    r427 r431  
    2525 
    2626#define LOWERCASE(c)    if(c >= 'A' && c <= 'Z') c -= 'A' - 'a' 
     27 
     28 
     29/* 
     30   UrlanEnv::atoms & UrlanEnv::atomNames can be made thread safe through one 
     31   of the following: 
     32 
     33   1. Use LOCK_ATOMS in or around these functions: 
     34        ur_internT() 
     35        ur_atomStrT() 
     36        ur_atomCStrT() (and all use of returned pointer) 
     37        ur_atomHash() 
     38        dumpAtoms() 
     39 
     40   2. Fix size of atom arrays and throw error/assert when full. 
     41      Must still lock these functions to access head/chain AtomRec members: 
     42        ur_internT() 
     43        dumpAtoms() 
     44 
     45   Option #2 is currently being used. 
     46*/ 
    2747 
    2848 
     
    120140void dumpAtoms( UThread* ut ) 
    121141{ 
    122     LOCK_GLOBAL 
     142    LOCK_ATOMS 
    123143    { 
    124144    const char* names = ut->env->atomNames.ptr.c; 
     
    143163    } 
    144164    } 
    145     UNLOCK_GLOBAL 
     165    UNLOCK_ATOMS 
    146166} 
    147167#endif 
     
    151171  Add atom to environment. 
    152172 
    153   If the environment has multiple threads, the caller must have called 
    154   LOCK_GLOBAL. 
    155  
    156173  \param str  Name of atom. 
    157174  \param len  Number of characters. 
    158175 
    159   \returns Atom  
     176  \return  Atom  
    160177*/ 
    161178UAtom ur_internT( UThread* ut, const char* str, int len ) 
     
    171188    AtomRec* node; 
    172189 
    173  
    174190    assert( len > 0 ); 
    175191 
    176192 
    177193    // Check if atom already exists. 
     194 
     195    hash = ur_hash( str, str + len ); 
     196 
     197    LOCK_ATOMS 
    178198 
    179199    atoms = &ut->env->atoms; 
    180200    table = (AtomRec*) atoms->ptr.v; 
    181201    names = &ut->env->atomNames; 
    182  
    183     hash = ur_hash( str, str + len ); 
    184202 
    185203    node = table + (hash % atoms->avail); 
     
    228246    // Nope, add new atom. 
    229247 
    230     /* TODO: Make atoms & atomNames thread safe through one of the following: 
    231  
    232        1. Halt all other threads. 
    233        2. Fix size of atom arrays and throw error/assert when full. 
    234        3. Use LOCK_GLOBAL in or around these functions in addition to ur_intern: 
    235             ur_atomStrT() 
    236             ur_atomCStrT() 
    237             dumpAtoms() 
    238     */ 
    239  
    240248    if( atoms->used == atoms->avail ) 
    241249    { 
     250#if 1 
     251        // Atom table size is fixed so read only access does not need to be 
     252        // locked.  When the table is full, we are finished. 
     253        assert( 0 && "Atom table is full" ); 
     254        return 0;       // TODO: Report fatal error 
     255#else 
    242256        ur_arrayReserve( atoms, sizeof(AtomRec), atoms->used + 1 ); 
    243257        ur_rebuildAtomHash( atoms ); 
     
    245259 
    246260        HASH_INSERT( atoms, table, node, hash, atoms->used ) 
     261#endif 
    247262    } 
    248263    node = table + atoms->used; 
     
    253268    node->nameLen   = len; 
    254269 
     270#if 1 
     271    if( (names->used + len + 1) > names->avail ) 
     272    { 
     273        assert( 0 && "Atom name buffer is full" ); 
     274        return 0;       // TODO: Report fatal error 
     275    } 
     276#else 
    255277    ur_arrayReserve( names, sizeof(char), names->used + len + 1 ); 
     278#endif 
     279 
    256280    cp = names->ptr.c + names->used; 
    257281    ep = cp + len; 
     
    267291done: 
    268292 
    269     return node - table; 
     293    c = node - table; 
     294 
     295    UNLOCK_ATOMS 
     296 
     297    return c; 
    270298} 
    271299 
     
    278306  If added, the word is initialied as unset. 
    279307 
    280   \returns  Index of word in context. 
     308  \return  Index of word in context. 
    281309*/ 
    282310int ur_internWordT( UThread* ut, const UContext* ctx, UAtom atom ) 
     
    424452/** 
    425453  Find word in context by atom. 
    426   \returns  Word index or -1 if not found. 
     454  \return  Word index or -1 if not found. 
    427455*/ 
    428456int ur_lookupT( UThread* ut, const UContext* ctx, UAtom atom ) 
  • branches/thune/thread_safe/eval.c

    r429 r431  
    17651765    { uc_thread,       "thread" }, 
    17661766#endif 
     1767#ifdef UR_CONFIG_TASKS 
     1768    { uc_task,         "task" }, 
     1769#endif 
    17671770    { uc_showTOS,      "." }, 
    17681771    { uc_lift_local,   "lift-local" }, 
  • branches/thune/thread_safe/internal.h

    r428 r431  
    3131#define UNLOCK_GLOBAL   mutexUnlock( ut->env->mutex ); 
    3232 
     33#define LOCK_ATOMS      LOCK_GLOBAL 
     34#define UNLOCK_ATOMS    UNLOCK_GLOBAL 
     35 
    3336 
    3437typedef struct 
     
    4245} 
    4346AtomRec; 
     47 
     48 
     49/* 
     50typedef struct 
     51{ 
     52    uint8_t  type; 
     53    uint8_t  flags; 
     54    uint16_t _pad0; 
     55    uint32_t _pad1; 
     56    OSThread thread; 
     57} 
     58UCellThread; 
     59*/ 
    4460 
    4561 
  • branches/thune/thread_safe/project.r

    r373 r431  
    9393    ] 
    9494    unix  [ 
    95         libs %m 
    96         libs %bz2 
     95        libs {m bz2 pthread} 
    9796    ] 
    9897    win32 [ 
  • branches/thune/thread_safe/read_config.r

    r426 r431  
    1717       uds: 
    1818       threads: 
     19       tasks: 
    1920       macros: 
    2021       emh: 
  • branches/thune/thread_safe/series.c

    r428 r431  
    9999  Get pointers to the start and end of a series or slice. 
    100100 
    101   \returns 
     101  \return 
    102102    Pointer to UBinary if cell is a series or slice. 
    103103    Otherwise, zero is returned. 
  • branches/thune/thread_safe/thread.c

    r408 r431  
    3838 
    3939 
     40extern UCellContext ur_thrGlobal; 
     41 
    4042UThread* ur_threadMake( UrlanEnv* env, int binCount, int blkCount ) 
    4143{ 
     
    5355        if( env->threads ) 
    5456        { 
     57#if 0 
    5558            UThread* link = env->threads; 
    5659            ut->nextThread = link->nextThread; 
    5760            link->nextThread = ut; 
     61#endif 
    5862        } 
    5963        else 
     
    7175        //ur_hold( ut, UT_BINARY, binN );       // Hardcoded in recycle 
    7276        } 
     77 
     78        // Make hold & global context blocks. 
     79        ur_makeBlock( 8 );        // 0 - BLK_THREAD_HOLD 
     80        ur_makeBlock( 1024 );     // 1 - BLK_GLOBAL_WORD 
     81        ur_makeBlock( 1024 );     // 2 - BLK_GLOBAL_VAL 
     82        ur_makeBlock( 1 );        // 3 - BLK_CTX_STACK 
     83        //ur_makeBlock( 0 );        // 4 - BLK_DSTACK 
     84 
     85        ur_pushContext( ut, (UCell*) &ur_thrGlobal ); 
    7386    } 
    7487    return ut; 
     
    7689 
    7790 
    78 #ifdef UR_CONFIG_THREADS 
     91/* 
    7992void ur_threadUnlink( UThread* th ) 
    8093{ 
     
    8295    it = ((UrlanEnv*) th->env)->threads; 
    8396    if( it == th ) 
    84         ((UrlanEnv*) th->env)->threads = (UThread*) th->nextThread; 
    85     while( it->nextThread != th ) 
    86         it = it->nextThread; 
    87     it->nextThread = th->nextThread; 
     97        ((UrlanEnv*) th->env)->threads = (UThread*) th->nextTask; 
     98    while( it->nextTask != th ) 
     99        it = it->nextTask; 
     100    it->nextTask = th->nextTask; 
     101} 
     102*/ 
     103 
     104 
     105#ifdef UR_CONFIG_TASKS 
     106void ur_taskUnlink( UThread* th ) 
     107{ 
     108    UThread* it; 
     109    it = ((UrlanEnv*) th->env)->threads; 
     110    if( it == th ) 
     111        ((UrlanEnv*) th->env)->threads = (UThread*) th->nextTask; 
     112    while( it->nextTask != th ) 
     113        it = it->nextTask; 
     114    it->nextTask = th->nextTask; 
    88115} 
    89116 
    90117 
    91118// Returns zero and sets error if no threads are ready. 
    92 UThread* ur_threadNextReady( UThread* cur ) 
     119UThread* ur_taskNextReady( UThread* cur ) 
    93120{ 
    94121    UThread* next = cur; 
     
    96123    do 
    97124    { 
    98         next = next->nextThread; 
    99         if( next->state == UR_THREAD_READY ) 
    100         { 
    101             next->state = UR_THREAD_RUNNING; 
     125        next = next->nextTask; 
     126        if( next->state == UR_TASK_READY ) 
     127        { 
     128            next->state = UR_TASK_RUNNING; 
    102129            return next; 
    103130        } 
    104         else if( next->state == UR_THREAD_BLOCKED ) 
     131        else if( next->state == UR_TASK_BLOCKED ) 
    105132        { 
    106133            UCell* pc; 
     
    113140                if( pc && ! ur_is(pc, UT_UNSET) ) 
    114141                { 
    115                     next->state = UR_THREAD_RUNNING; 
     142                    next->state = UR_TASK_RUNNING; 
    116143                    return next; 
    117144                } 
     
    121148    while( next != cur ); 
    122149 
    123     ur_throwErr( cur, UR_EX_SCRIPT, "All threads are blocked" );  
     150    ur_throwErr( cur, UR_EX_SCRIPT, "All tasks are blocked" );  
    124151    return 0; 
    125152} 
     
    145172    */ 
    146173 
    147     th->state  = UR_THREAD_READY; 
     174    th->state  = UR_TASK_READY; 
    148175    th->callOp = 0; 
    149176    th->tos = th->dstack; 
     
    183210 
    184211/** 
    185   \returns cell pointer to TOS. 
     212  \return  cell pointer to TOS. 
    186213  Pops TOS if pop is non-zero. 
    187214*/ 
  • branches/thune/thread_safe/thune.c

    r427 r431  
    1111extern int ur_lessOrEqual( const UCell*, const UCell* ); 
    1212 
    13 #ifdef UR_CONFIG_THREADS 
    14 extern void ur_threadUnlink( UThread* ); 
    15 extern UThread* ur_threadNextReady( UThread* ); 
     13#ifdef UR_CONFIG_TASKS 
     14extern void ur_taskUnlink( UThread* ); 
     15extern UThread* ur_taskNextReady( UThread* ); 
    1616#endif 
    1717 
     
    164164 
    165165#ifdef UR_CONFIG_THREADS 
    166 // (block -- ) 
     166#ifdef _WIN32 
     167static DWORD WINAPI threadRoutine( LPVOID arg ) 
     168#else 
     169static void* threadRoutine( void* arg ) 
     170#endif 
     171{ 
     172    UThread* ut = (UThread*) arg; 
     173    ur_eval( ut, ut->tos[1].series.n, 0 ); 
     174    ur_threadFree( ut ); 
     175    return 0; 
     176} 
     177 
     178/* 
     179static void ur_threadCloneCell( UThread* ut, UCell* cellA, 
     180                                UThread* to, UCell* cellB ) 
     181{ 
     182    UIndex newBlkN; 
     183 
     184    switch( ur_type(cell) ) 
     185    { 
     186        case UT_BLOCK: 
     187        { 
     188            UBlock* blkA; 
     189            UBlock* blkB; 
     190            blkA = ur_blockPtr( blkN ); 
     191            newBlkN = ur_makeBlockT( to, blkA->used, &blkB );  
     192        } 
     193            break; 
     194 
     195        case UT_STRING: 
     196            break; 
     197    } 
     198} 
     199*/ 
     200 
     201 
     202// (code -- ) 
    167203UR_CALL( uc_thread ) 
    168204{ 
     205    if( ur_is(tos, UT_BLOCK) || ur_is(tos, UT_STRING) ) 
     206    { 
     207        UThread* thr = ur_threadMake( ut->env, 64, 128 ); 
     208        if( thr ) 
     209        { 
     210            OSThread pth; 
     211#ifdef _WIN32 
     212            DWORD winId; 
     213#endif 
     214 
     215            if( ur_is(tos, UT_STRING) ) 
     216            { 
     217                char* cpA; 
     218                char* cpB; 
     219                ur_seriesMem( ut, tos, &cpA, &cpB ); 
     220                thr->tos[1].series.n = ur_tokenize( thr, cpA, cpB ); 
     221            } 
     222 
     223#ifdef _WIN32 
     224            pth = CreateThread( NULL, 0, threadRoutine, thr, 0, &winId ); 
     225            if( pth == NULL ) 
     226#else 
     227            if( pthread_create( &pth, 0, threadRoutine, thr ) != 0 ) 
     228#endif 
     229            { 
     230                ur_throwErr( UR_ERR_INTERNAL, "Could not create thread" ); 
     231                return; 
     232            } 
     233 
     234#if 1 
     235            { 
     236            void* status; 
     237            pthread_join( pth, &status ); //KR 
     238            } 
     239#endif 
     240        } 
     241        UR_S_DROP; 
     242    } 
     243    else 
     244    { 
     245        ur_throwErr( UR_ERR_DATATYPE, "thread expected block!/string!" ); 
     246    } 
     247} 
     248#endif 
     249 
     250 
     251#ifdef UR_CONFIG_TASKS 
     252// (block -- ) 
     253UR_CALL( uc_task ) 
     254{ 
    169255    if( ur_is(tos, UT_BLOCK) ) 
    170256    { 
    171257        UThread* thr; 
    172258        UThread* save; 
    173         thr = ur_threadMake( ut->env ); 
     259        thr = ur_threadMake( ut->env, 32, 32 ); 
    174260        if( thr ) 
    175261        { 
     
    307393    UCell* pc; 
    308394    UCell* end; 
    309 #ifdef UR_CONFIG_THREADS 
     395#ifdef UR_CONFIG_TASKS 
    310396    int insCycle = 0; 
    311397#endif 
     
    323409execute: 
    324410 
    325 #ifdef UR_CONFIG_THREADS 
     411#ifdef UR_CONFIG_TASKS 
    326412    if( ++insCycle == 100 ) 
    327413    { 
    328         if( ut->nextThread != ut ) 
    329         { 
    330             ut->state = UR_THREAD_READY; 
     414        if( ut->nextTask != ut ) 
     415        { 
     416            ut->state = UR_TASK_READY; 
    331417switch_thread: 
    332418            PUSHC_EVAL( blkN, start, pc ); 
    333             ut = ur_threadNextReady( ut ); 
     419            ut = ur_taskNextReady( ut ); 
    334420            if( ! ut ) 
    335421                goto error; 
     
    363449        case UT_UNSET: 
    364450            --pc; 
    365 #ifdef UR_CONFIG_THREADS 
    366             if( ut->nextThread != ut ) 
    367             { 
    368                 ut->state = UR_THREAD_BLOCKED; 
     451#ifdef UR_CONFIG_TASKS 
     452            if( ut->nextTask != ut ) 
     453            { 
     454                ut->state = UR_TASK_BLOCKED; 
    369455                goto switch_thread; 
    370456            } 
     
    9561042            goto execute; 
    9571043 
    958 #ifdef UR_CONFIG_THREADS 
     1044#ifdef UR_CONFIG_TASKS 
    9591045        case CC_TERM: 
    960             if( ut->nextThread == ut ) 
     1046            if( ut->nextTask == ut ) 
    9611047                goto finish; 
    962 term_thread: 
     1048term_task: 
    9631049            { 
    9641050            UThread* thr = ut; 
    965             ut->state = UR_THREAD_TERM; 
    966             ut = ur_threadNextReady( ut ); 
     1051            ut->state = UR_TASK_TERM; 
     1052            ut = ur_taskNextReady( ut ); 
    9671053            if( ! ut ) 
    9681054                goto error; 
    969             ur_threadUnlink( thr ); 
     1055            ur_taskUnlink( thr ); 
    9701056            ur_threadFree( thr ); 
    9711057            } 
     
    9741060 
    9751061        case CC_END: 
    976 #ifdef UR_CONFIG_THREADS 
    977             if( ut->nextThread != ut ) 
    978                 goto term_thread; 
     1062#ifdef UR_CONFIG_TASKS 
     1063            if( ut->nextTask != ut ) 
     1064                goto term_task; 
    9791065#endif 
    9801066            goto finish; 
  • branches/thune/thread_safe/tokenize.c

    r427 r431  
    299299 
    300300/** 
    301  \return  Returns block index or zero if an error is thrown. 
     301 \return  Block index or zero if an error is thrown. 
    302302*/ 
    303303UIndex ur_tokenize( UThread* ut, const char* it, const char* end ) 
  • branches/thune/thread_safe/unix/os.h

    r387 r431  
    5959 
    6060 
     61typedef pthread_t           OSThread; 
    6162typedef pthread_mutex_t     OSMutex; 
    6263 
    63 #define mutexInit(mh)        pthread_mutex_init(&mh,0) 
    64 #define mutexFree(mh)        pthread_mutex_destroy(&mh) 
    65 #define mutexLock(mh)        pthread_mutex_lock(&mh) 
    66 #define mutexUnlock(mh)      pthread_mutex_unlock(&mh) 
     64#define mutexInitF(mh)      (pthread_mutex_init(&mh,0) == -1) 
     65#define mutexFree(mh)       pthread_mutex_destroy(&mh) 
     66#define mutexLock(mh)       pthread_mutex_lock(&mh) 
     67#define mutexUnlock(mh)     pthread_mutex_unlock(&mh) 
    6768 
    6869 
  • branches/thune/thread_safe/urlan.c

    r427 r431  
    411411    int i; 
    412412 
     413    //printf( "KR sizeof(pthread_t) = %lu\n", sizeof(pthread_t) ); 
     414    //assert( sizeof(UCellThread) == 16 ); 
     415 
    413416    assert( sizeof(UCell) == 16 ); 
    414417    //assert( sizeof(LocalFrame) == 16 ); 
     
    451454  \param customCount  Number of UDatatype in custom array. 
    452455 
    453   \returns UR_EVAL_ code. 
     456  \return  UR_EVAL_ code. 
    454457*/ 
    455458int ur_startup( UrlanEnv* env, UDatatype* custom, int customCount ) 
     
    467470    env->dtCount  = UT_BI_COUNT + customCount; 
    468471 
    469     ur_arrayInit( &env->atoms,     sizeof(AtomRec),      4096 ); 
    470     ur_arrayInit( &env->atomNames, sizeof(char),         2048 ); 
     472    ur_arrayInit( &env->atoms,     sizeof(AtomRec),      2048 ); 
     473    ur_arrayInit( &env->atomNames, sizeof(char),         2048 * 6 ); 
    471474    ur_arrayInit( &env->devices,   sizeof(UPortDevice*),    8 ); 
    472475    ur_rebuildAtomHash( &env->atoms ); 
     
    485488    // Init main thread. 
    486489 
    487     if( mutexInit( env->mutex ) == -1 ) 
    488     { 
    489 #ifndef _WIN32 
     490    if( mutexInitF( env->mutex ) ) 
     491    { 
     492#ifdef _WIN32 
     493        //GetLastError(); 
     494#else 
    490495        perror( "mutexInit" ); 
    491496#endif 
     
    495500    ut = ur_threadMake( env, 512, 1024 ); 
    496501 
    497     // Make hold & global context blocks. 
    498     ur_makeBlock( 8 );        // 0 - BLK_THREAD_HOLD 
    499     ur_makeBlock( 1024 );     // 1 - BLK_GLOBAL_WORD 
    500     ur_makeBlock( 1024 );     // 2 - BLK_GLOBAL_VAL 
    501     ur_makeBlock( 1 );        // 3 - BLK_CTX_STACK 
    502     //ur_makeBlock( 0 );        // 4 - BLK_DSTACK 
    503  
    504     ur_pushContext( ut, (UCell*) &ur_thrGlobal ); 
    505  
    506     { 
    507     //UBlock* blk = ur_tblockPtr( BLK_DSTACK ); 
    508     } 
    509502 
    510503    // Add datatypes first so UT_ defines match the index & atom. 
  • branches/thune/thread_safe/urlan.h

    r427 r431