**************************************** serTxRx_Fast_pico.txt **************************************** #by D@CC #on 2022EMay30 import os import utime from machine import UART, Pin #calls: #bChar() #setTBuff() #os.uname() #serTXTx_pico() #timeOut() #machine.Pin() #utime.time() #utime.sleep() #led.onboard.value() #UART() #uartObject.write() #uartObject.any() #uartObject.read() #uart.any() #print("unfixed ERROR thinks msg is fully received at 1500 (sees a ']' at 35th byte") progName="serTxRx_Fast_pico.py" bps=1500 # baud (bits/sec) rate First Att/ saw Second Attp saw # 3000 32 0 # 1700 32 all # 1525 32 all # 1525 - hung # 1500 32 lost in mid 2 # 1440 garbage hung # 1440 all all # 1350 all all # 1000 all all # 500 all all # 300 - hung # 300 all all t10c=100/bps t1000c=10000/bps def setTBuff(bps,minTBuff=5,maxTBuff=15): tBuff=1000/bps if tBuffmaxTBuff: tBuff=maxTBuff return tBuff #end def tBuffer=setTBuff(bps) print("tBuffer:",tBuffer) errThreshold=99 rxtxEndChar="]" isUsingFirstAttempt=True isUsingSecondAttempt=True # tested OK on 2021FJun15 by D@CC # the b'H' has been converted to "H" # backed up as serTxRx_pico_BU2021FJun15-1702.py #based pm progName="mpy_uart_pico.py" print("progName:"+progName) print("Error occurs when using pins 8 and 9 as Rx and Tx") txPin=4 rxPin=5 print("Jumper GP"+str(rxPin)+" to GP"+str(txPin)) #print sys info print(os.uname()) #indicate program started visually led_onboard = machine.Pin(25, machine.Pin.OUT) led_onboard.value(0) # onboard LED OFF for 0.5 sec utime.sleep(0.5) led_onboard.value(1) #create uart object if False : #self.uart = UART(1, 115200, parity=None, stop=1, bits=8, rx=Pin(rxPin), tx=Pin(txPin)) # speed of 400 only reads 4 bytes . . Doh? # bps=9000 pass #end if #tBuff=setTBuff(bps) uart = machine.UART(1,bps, parity=None, stop=1, bits=8, tx=Pin(txPin), rx=Pin(rxPin)) #print uart info #uart = machine.UART(1) print(uart) def bChar(strB): # given a string like b'z' this returns the "z" # tested by D@CC on 2021FJun15 #strB=input("eg b'h' :") b=strB[2:] #print("b:"+b) c=b[0] #c="zz" #print("bChar("+'"'+strB+ '"):'+c) return c #end def def serTxRx_pico(uartObject,bps, rOrT="R",txtToBeSent="[UUUUUUUU]",tBuff=3,rxtxEndChar="]"): t10c=100/bps t1000c=10000/bps # requires a uart object (below) to be created before this function is called #uart = machine.UART(1,600, parity=None, stop=1, bits=8, tx=Pin(txPin), rx=Pin(rxPin)) #print("entering serTxRx") #print("rOrT:"+rOrT) #Note: there appears to be an internal error in either the Tx or the Rx function. This # error causes messages (simple text strings) longer than 32 bytes to be chopped, resulting # in a loss of data no matter how long the program waits for the remainder of the message # to arrive. What is really strange is that this issue is speed-dependant at both extremes # of speed. The result is that the last character of the message ("]") might never arrive # or in some cases, characters in the middle of the message have been lost. This issue # would dissappear if an ACK/NAK protocol were used, but this level of sophistication should # NOT be necessary for communication between two boards that are only inches apart. # # To further explore this issue, a wait count (called an "Error Threshold: errThreshold" is # used internally to count how many times the code receives an empty message before it # "gives up" and terminates with an error note. As this Threshold is raised, a second # threshold (a time-out) is used after which an error note is received. This is complicated # by the fact that unix times on the pico do NOT seem to be measured in intervals less than # a second. # Precision in milliseconds would be needed to work on this issue. Exceeding a time # limit of 31.4159 seconds will also result in program termination with a different error message. # Another complication is that this issue is intermittent, and cannot always be reproduced. # This makes me wonder if something similar to "RAM garbage collection" might be in play. We # have been told that "RAM garbage collection" does NOT occur in the Pico. Doh! #errThreshold of 99 works at low baud rates (even down to 200 bps) # tested by D@CC with MPA board down to 20 with errThreshold=99 # sending longer messages (>32 characters) cause data loss by the receiver # apparently very slow and very fast baud rates cause problems with 60 character msgs # D@CC saw it fail at 450 bps sometimes (but "usually" works at 400) # testing was done on an RPi model 4 with a RP4020 pico # D@CC doesn't yet know what the high baud rate limit is isValidRorT=False txtIn=" mt " if rOrT=="T": isValidRorT=True txtOut=txtToBeSent #print("A transmitting:"+txtOut+":") if txtOut=="": txtOut="[HelloThere]" #end if print("B1 transmitting:"+txtOut+":") uartObject.write(txtOut) utime.sleep(0.1) print("transmission complete") return #end if if rOrT=="X": isValidRorT=True txtIn="" while uartObject.any(): txtIn=uartObject.read(1) #end while if txtIn=="": pass else: print(txtIn) #end if #end if m=0 if rOrT=="R" : #print("Receiving t10c:",t10c) isValidRorT=True #print("doing R") utime.sleep(t10c) #wait for 10 char to be received txtMsg="" tBeg=utime.time() #print("tBeg:",tBeg) # # loop while the pico thinks that "any" more of the message is expected # How can it know how long the message should be? unless it senses # a non-empty "buffer queue" of unprocessed characters exists? while uart.any(): m=m+1 # this next statement reads "one character at a time" # If more than 1 character is requested to be read, the routine # will "wait" internally until the requested count of characters # has been received, essentially "hanging" the code until then. byteOne=uart.read(1) char1=str(byteOne) # the following routine named bChar() converts the message # containing the one character (eg b'h') to the character "h" # by stripping the superfluous text surrounding the character. #txtMsg=txtMsg+str(m)+bChar(char1) txtMsg=txtMsg+bChar(char1) #print(m,char1) utime.sleep(0.01) #end while tNow=utime.time() dT=tNow-tBeg #print("dT:",dT,"txtMsg:"+txtMsg) #print("txtMsg:"+txtMsg) if txtMsg=="": # an empty txtIn messge will NOT be returned # this can result in a "hung" program if # data has been truly lost. txtIn=txtMsg if dT> 31.4159: print(" ERROR B wait time exceeds 31.4159 seconds") exit #break out of this hung situation else: pass #end if else: txtIn=txtMsg return txtIn #end if #print("after R loop") #end if if isValidRorT : #print("returning") return txtIn else: txtIn="?" #print("returning") return txtIn #end if #end def def timeOut(t0,tOut): tNow=utime.time() dT=tNow-t0 #print("q:",q,rxtxEndChar,"dT:",dT) if dT>tOut: return True #print("ERROR E wait time exceeds",t1000c,"secs") else: return False #end if #################################################################################### #transmit the default text (then print and return any received text) #serTxRx_pico(uart,bps,"T","[UUUUUUUUUUUUUUUUUUUUUUUUAUU30123456789UUUUUUUUUUU]") if isUsingFirstAttempt : pass #end if # the program executes so fast that we must wait for the message # to be fully sent before we start reading the message. # if we sleep(0), we will only see the final "o" Doh! if False: while uart.any(): print(uart.read(1)) #end while #end if if isUsingFirstAttempt==False: print("skipping first attempt at coding this.") else: print("First attempt at coding this.") serTxRx_pico(uart,bps, "T","[UUUUUUUUUUUUUUUUUUUUUUUUAUU301234567890abcdefghij]") #serTxRx_pico(uart,bps, "T","[UUUUUUUUUUUUUUUUUUUUUUUUAUU3012345678901234567890]") print("receiving 2 t10c:",t10c) utime.sleep(t10c) # throttle transmission by 10 character pause kk=0 partMsg="" tBeg=utime.time() dT=-1 #print(kk,dT,"partMsg:"+partMsg+":") print("receiving 2 t10c:",t10c) while (len(partMsg)==0 or ((partMsg[-1:]==rxtxEndChar)==False)): # leng==0 or last char not "]" # keep reading characters msgStr="testing part Msg[-1:] :" #print(msgStr+partMsg[-1:]) tNow=utime.time() dT=tNow-tBeg if dT>t1000c: print("dT:",dT) print("Error rX delay > ",t1000c,"secs") break #out of while loop #end if kk=kk+1 #print(kk,dT,"partMsg:"+partMsg+":") msgRecd=serTxRx_pico(uart,bps,"R",tBuff=tBuffer) partMsg=partMsg+msgRecd #print("XpartMsg:"+partMsg) # if kk>errThreshold: print("Error F -missing ']' at end of reading") break # out of the while loop else: pass #end if # get the result ready in case the while loop stops with a ")" #end while print("bps:",bps) print("Msg received 1st Try:"+partMsg+":") #end if # Reading if False: for j in range(51): byteRecd=serTxRx_pico(uart,bps,"R",tBuff=tBuffer) strRecd=str(byteRecd) #print("strRecd:"+strRecd+":") if strRecd==" mt ": pass else: #strRecd=str(byteRecd) print(j,"txtRecd:"+strRecd+":") #end if #end for #end if #print("end of j loop") print("bps:",bps) pauseN=tBuffer #pause time set to 5 seconds print("tBuffer:",tBuffer) utime.sleep(t10c) #let the buffers settle down (10 char delay) if isUsingSecondAttempt : print() print("pausing to wait for buffers to settle:",pauseN,"secs") print("isUsingSecondAttempt") print("First . . clear out any residual message:") while uart.any(): inStr=str(uart.read(1)) inStr=bChar(inStr) print(inStr, end='') #end while print() print("end of search for residual character(s)") print("transmitting") #serTxRx_pico(uart, bps,"T","[UUUUUUUUUUUUUUUUUUUUUUUUAUU3012345678901234567890]") serTxRx_pico(uart,bps, "T","[UUUUUUUUUUUUUUUUUUUUUUUUAUU301234567890abcdefghij]") print("t10c:",t10c) #utime.sleep(t10c) #10 char delay to allow the transmission to be sent print("receiving 2nd attempt") q=0 if True: partMsg="" leng=0 tBeg=utime.time() print("rxtxEndChar:"+rxtxEndChar) #while (leng==0 or (partMsg[-1:]==rxtxEndChar)==False): if (partMsg[-1:]==rxtxEndChar) : isKeepChecking=False else: isKeepChecking=True # or if (leng==0): isKeepChecking=True while isKeepChecking: #print("starting the while loop") # empty msg or last char not == "]" #print("q:",q, "before uart.any") isCheckUartAny=True if isCheckUartAny and uart.any(): #print("after if uart any") inByte=uart.read(1) inStr=bChar(str(inByte)) #print("inStr:",inStr) partMsg=partMsg+inStr isToTest=(partMsg[-1:]==rxtxEndChar) #print("isToTest:"+str(isToTest)) if isToTest : #That was the "end" character=="]" print(" partMsg:"+partMsg+":") #break #out of the while loop isKeepChecking=False #end if #print("end of uart.any") #end if if False: if False: # skipping this########################## q=q+1 #print("q:",q,rxtxEndChar) tNow=utime.time() dT=tNow-tBeg if dT>t1000c: print("ERROR C wait time exceeds",t1000c, "secs") break else: pass #end if #end if ######################################## inStr=str(uart.read(1)) print("inStr:"+inStr) #leng=len(inStr) #leng=1 #if leng>0 : #print("inStr:"+inStr) #for r in range(leng): chrStr=str(inStr) chrStr=bChar(chrStr) partMsg=partMsg+chrStr print("isToTest:"+str(partMsg[-1:]==rxtxEndChar)) #end for #end if # end else: if False: #print("WARNING: still not any chars received") q=q+1 print("q2:",q,rxtxEndChar) if timeOut(tBeg,t1000c): print(" partMsg:"+partMsg+":") print("timeOut exiting") break #out of while #end if utime.sleep(t10c) #end if #end while print("endWhile") #print(" partMsg:"+partMsg+":") #end if #end if print() print("- bye -") #end def #Source:http://helloraspberrypi.blogspot.com/2021/01/raspberry-pi-picomicropython-simple.html #program end #******************************************************************************************** #/serTxRx_Fast_pico.py