import os #import utime #from machineIX import UART, Pin import IXutime as utime import IXmachine as machine import IXled_onboard as IXled_onboard import IXuart as uart def Pin(s1): #IX return #IX #uart.write("12 zyx") #by D@CC #Date: 2024BFeb13 #Note: as of today, partially tested # all functions and subroutines are at top of program # 1. bChar() converts 4 char byte into 1 char # 2. serTx_pico() transmits a full serial text message # 3. txParse() parses the outgoing text message and calls txChar # 4. txChar() adds the char to oP which is the output text packet queue # 5. sendThisPack actually transmits a full packet (wo any EOM) # 6. sendLastPack actually transmits text (with at least one EOM) # 7. serRx_pico() receives a full serial text message # testTx transmits many test messages # testRx receives all test messages #calling parms #bChar(strB): #serTx_pico(uartObject,bps=300, txtToBeSent="Hello,World",rxtxEndChar="]",pktSize=16): #txParse(uartObject, thisMsg, bps=300, msgFormat="SMEPX", T5C=-1, pktSize=16, eom="]"): #txChar(char): #sendThisPack(uartObject, oPIn, eom, pktSize): #sendLastPack(oPIn, cntSent, cntUsed, eom = "]", pktSize=16): #serRx_pico(uartObject, bps=300, timeOut=-1, eom="]"): def bChar(strB): #bChar() # given a string like b'z' this returns the "z" # tested by D@CC on 2021FJun15 #strB=input("eg b'h' :") b=strB[2:] #print("b:"+b) c=b[0] #c="z" #print("bChar("+'"'+strB+ '"):'+c) return c #end def def serTx_pico(uartObject,bps=300, txtToBeSent="Hello,World",rxtxEndChar="]",pktSize=16): #serTx_pico# print("54 in serTx_pico txtToBeSent :"+txtToBeSent+":") #every parameter has a default setting except the uartObject (which needs to be defined). t5c=50/bps # 50 bit times = 5 character times t10c=100/bps # 100 bit times = 10 character times t1000c=10000/bps # 10000 bit times = 1000 character times # requires a uart object (below) to be created before this function is called #uart = machine.UART(1, 300, parity=None, stop=1, bits=8, tx=Pin(txPin), rx=Pin(rxPin)) #print("entering serTx_pico.py") if rxtxEndChar=="]": som = "[" eom = "]" msgOut = """ #S + #M + #E + #P + #C""" print("74 msgOut :",msgOut,":") #where #A is the letter "A" meaning "16 bit packets (pktSize)" #S is SOM #M is Message text to be sent (will be followed by EOM) #E is EOM #P is the padding characters (enough EOMs to fill the packet) #X is t10c (ten character time delay: "roman numeral X") #C is t100c (100 character time delay: "roman numeral C") #Most messages will have msgOut = "AC" "SMEPX" #Between messages, the transmitter will send "0" every minute indicating that its alive but #has no message to send. #Before sending each message the transmitter will send a single "A" which is 0x41 indicating #that the following message will be in packets of 16 characters. In the future when the #packet size. #changes to 32 characters, the transmitter will precede each message with a single "B" #indicating the each packet will contain 32 characters. This will allow future packet sizes #at least as big as 16x16x4 = 1024 characters and total message lengths of around 32768 without #using any control characters in the "ascii" preamble. # Future versions will have msgOut = "DSNKMEPX" # with packets having the format: # AX SNKMV MV MV etc MEPX #The optional packet of 16 x D permits synchronization # #where #D is a full packet of Delete characters (optional for synchronization #N is the 2-character packet Number #K is the 2-character message character "Kount" including the only the first final EOM #V is t5c (5 character time delay: "roman numeral V") #When a receiver starts up (and between messages) it should start by expecting packets of single #characters. It will read a single 8-bit character expecting it to be a "0" meaning an idle transmitter. #If it is not a "0", it will read another single 8-bit character expecting it to be an "A". If it is not an #"A",the character will be discarded, but the receiver will presume that it is reading a character #stream that is mid-message. It will read single 8-bit characters searching for a "]" which will indicate #that this is the last packet of the message. It will keep reading and discarding all the "]" that it sees. #After the last "]", the #next 8-bit character should be a "0" or an "A". The receiver should keep reading and discarding the "0" #characters. When an "A" is finally received (while reading single 8-bit characters), the receiver knows that #this means subsequent packets will be packets of 16 characters. The receiver is now fully #synchonized with the transmitter. If something other than an "A" is received, the receiver should #begin looking for a "]" ()meaning "end of message" again. If the state of "being in mid-message #continues 10 times, a human operator should be informed the that this ERROR is occuring. # #send the message txParse(uart,txtToBeSent) #IX #print("125 end of serTx()") #print("126 Hit ^C to stop!") #print() return True #end def serTx_pico() def txParse(uartObject, thisMsg, bps=300, msgFormat="SMEPX", T5C=-1, pktSize=16, eom="]"): #txParse() #show("thisMsg:",thisMsg) # # uartObject is the uart being used # thisMsg is the full text message to be sent (It should not contain (EOM="]") they will become "?") # (An EOM is added as the last character of a message, do not put the EOM in the msg) # bps is the baud rate (bits/sec) # msgFormat is the form of the Msg e.g. SMEPX (first E is counted as last char of M) # (P is enough additional EOMs to pad to packetSize=16) # (t5c is interval between packets (-1 defaults to t5c) # t5c indicates time interval of 5 characters. The transmitter will follow each packet with # this time delay # packetSize defaults to 16 # eom is the definition of the 'EOM" character (usually "]") #oP is the full outgoing Packet that is sent by sendThisPack # or by sendLastPack #print("149 into txParse") oP="" # outgoing msg is initially empty if T5C==-1: # if the calling routine doesn't specify t5c=50/bps t10c=100/bps t1000c=10000/bps else: t5c=T5C #if the calling routine does specify T5C t10c=T5C*2 t1000c=T5C*200 #end if if eom=="]": som="[" #end if # in 2021 msgOut="SMEPX" lenM=len(thisMsg)+1 #length of thisMsg -1 (+1 for the EOM character cntUsed=0 # number of characters in msgOut used so far cntSent=0 # number of character actually sent (incl all chars sent) isEOMsent=False #EOM not sent yet #show("msgFormat:",msgFormat) for msgPart in msgFormat: # do each message segment if msgPart=="S": # send SOM #print("send SOM") oP=oP+txChar(som) oP=oP+txChar("1") # in future these 2 characters will contain parity or CRC oP=oP+txChar("6") # today they contain the packet size (ie character count) cntSent=cntSent+3 #3 actually sent now #print() #print("oP:"+oP+":") cntUsed=0 #end if if msgPart=="X": # time delay doesn't send any characters utime.sleep(t10c) #end if if msgPart=="M": # sends the msg followed by 1 eom #print("sending M") #cntSent of the packet is already in oP #cntUsed is 0 going into the while # note that lenM also includes 1 eom # in the while loop, cntUsed will increase until cntUsed=lenM while (lenM-cntUsed) >= 1 : #lenM incl the EOM, cntUsed does also (after the EOM is sent) #print("going into the M while") #show("upon entry M-while cntUsed:",cntUsed) # the packet being sent should not include any EOM characters. (EOM is only in the last pk) # send a packet of data # lastChar is the lastChar of Msg to send in this packet cntCanAdd=pktSize-len(oP) #print("cntCanAdd:",cntCanAdd,":") #show("cntUsed:",cntUsed) #should be 0 entering first loop cntRemainOfM=lenM-cntUsed-1 #this cnts the remaining chars of M cntToAdd=pktSize-len(oP) #this cnts the remaining space in oP # a packet is always sent in this next "if clause" #show("cntRemainOfM:",cntRemainOfM) #show("cntToAdd:",cntToAdd) if cntRemainOfM>cntToAdd : # this packet cannot hold rest of Msg plus an eom isLastPack=False opToAdd=thisMsg[cntUsed:cntUsed+cntToAdd] # to add to oP opToSend=oP+opToAdd # to be sent #print("AopToSend:"+opToSend+":") cntPadding=0 sendThisPack(uartObject, opToSend, cntPadding, eom, pktSize) # that sent a full pkt without any padding cntUsed=cntUsed+cntToAdd+1 # remember to count the first EOM as a character in cntUsed #show("cntUsed:",cntUsed) #show("lenM:",lenM) oP="" cntSent=0 #continue the while loop else : isLastPack=True # the pkt can hold the whole cntRemainOfM opToAdd=thisMsg[cntUsed:cntUsed+cntRemainOfM] #print("BopToSend:"+opToSend+":") cntPadding=pktSize-cntSent-cntRemainOfM strPadding=eom*cntPadding opToSend=oP+opToAdd+strPadding #print("CopToSend:"+opToSend+":") sendLastPack(uartObject, opToSend, cntPadding, eom, pktSize) # that sent a full pkt including the padding cntUsed=cntUsed+len(opToAdd) #show("lenM:",lenM) #show("cntUsed:",cntUsed) oP="" cntSent=0 #print("after Last Packet of Msg") toSend=lenM-cntUsed #show("toSend:",toSend) #print("end of while") break #out of the while #if end cntUnsent=lenM-cntUsed-1 #show("cntUnsent:",cntUnsent) #print("repeat the M while") #end while if False : # we need to send at least 1 more full packet isLastpack=False lastChar=cntUsed+(pktSize-len(oP)) #this is lastChar of Msg yet to send opToAdd=thisMsg[cntUsed:lastChar] # to add to oP opTosend=oP+opToAdd print("opToSend:"+opToSend+":") sendThisPack(uartObject, opToSend, eom, pktSize) # no padding was necessary cntUsed=cntUsed+len(opToAdd) show("cntUsed:",cntUsed) # the while will loop once more #end if if False : lastChar=pktSize-cntSent show("lastChar",lastChar) #thisMsg="12345678901234567" #print("thisMsg:"+thisMsg+":") show("cntUsed",cntUsed) show("lastChar",lastChar) show("thisMsg[cntSent:lastChar]",thisMsg[cntUsed:lastChar]) opWasSent=thisMsg[cntUsed:lastChar] print("opWasSend:"+opWasSend+":") if len(opToSend)>=16 : print() print("before sendThisPack oP:"+oP+":") cntSent=sendThisPack(uartObject,opToSend, eom, pktSize) #returns 16 should print the Pkt oP="" show("cntUsedFromMsg:",cntUsedFromMsg) cntUsed=cntUsed+cntUsedFromMsg # should be 16 ...... to verify cntSent=cntSent+16 show("lenM",lenM) show("cntUsed",cntUsed) isLastPacket=False else: print("need to send padded Packet") isLastPacket=True #end if #end if False if False : # a packet has just been sent leftToSend=lenM-cntUsed-1 show("leftToSend:",leftToSend) if False : if isLastPacket==True : # will send the Last Packet (containing at least 1 EOM) print("isLastPack is True") #useStr=thisStr cntUsedFromMsg=len(opToSend) oP=opToSend toAdd=16-cntUsedFromMsg show("toAdd:",toAdd) for x in range(toAdd): # must insert correct range !!!!! oP=oP+txChar(eom) # must insert correct char from fullMsg !!!!! cntSent=cntSent+1 #end for print() print("before sendThisPack oP:"+oP+":") cntSent=sendThisPack(uartObject,oP, eom, pktSize) #returns 16 should print the Pkt oP="" cntUsed=cntUsed+cntUsedFromMsg show("cntUsed:",cntUsed) #end if #end if False # a packet was just sent print("M while might end now") print("after end of while") #end if False #end if "M" if False : if msgPart=="P": # P means Padding (with more EOMs) if any needs to be sent print("msgPart=P") if False : if True: # to be decided oP=oP+txChar(eom) # how many eoms of padding !!!!! cntUsed=cntUsed+1 cntSent=cntSent+1 #end if show("cntUsed",cntUsed) strSent=thisMsg[cntUsed:cntUsed+16] show("strSent:",strSent) cntUsedFromMsg = sendLastPack(uartObject, strSent, eom, pktSize) # should print the Pkt cntUsed=cntUsed+cntUsedFromMsg cntSent=cntSent+pktSize #pktSize is usually 16 #end if #end if "P" #end if False #end for # all msg parts have now been sent # should check that cntUsed = len(Msg) +1 # should check that cntSent is a muliple of pktSize(eg 16) return #from parsing and transmitting #end def txParse def txChar(char): #txChar() #simply records that "char" has been sent # just prints it during testing # doesn't actually send the char #print("#") print(char, end='') #"end" suppresses the line feed #IX return char #end def txChar def sendThisPack(uartObject, opIn, cntPadding, eom, pktSize): #sendThisPack() # #actually sends the 16 characters in oP #print() #print("in sendThisPack") oP=opIn lenopIn=len(opIn) if (lenopIn==pktSize)==False: print("ERROR",pktSize, "is not the size of the packet to send:",lenopIn) #end if pos=opIn.find(eom) if pos==-1: # -1 if not found pass else: print("WARNING E in serTx", eom, "found in message") oP=oP.replace(eom,"?") #end if uartObject.write(oP) print("377 tx sent :"+oP+":") #keep user informed during testing return pktSize #end def def sendLastPack(uartObject, opIn, cntPadding, eom, pktSize): #sendThisPack() # #actually sends the 16 characters in oP #print() #print("in sendLastPack") oP=opIn #print("oP:"+oP+":") lenopIn=len(opIn) #show("lenopIn:",lenopIn) if (lenopIn==pktSize)==False: print("ERROR",pktSize, "is not the size of the packet to send:",lenopIn) #end if #search for eom in the non-padding portion of oPIn suboPIn=opIn[0:pktSize-cntPadding] #print("suboPIn:"+suboPIn+":") pos=suboPIn.find(eom) if pos==-1: # -1 if not found pass else: print("WARNING E in serTx", eom, "found in message") oP=oP.replace(eom,"?") #end if uartObject.write(oP) print("405 tx sent:"+oP+":") #keep user informed during testing return pktSize #end def def mpyRead(uart): import IXuart as IXuart #keep read1 character packet until timeOut print("415 into mpyRead") txtRead="" bRead="" IXany=IXuart.any() while IXany: aByte=str(IXuart.read(1)) show("aByte:",aByte) bRead=bChar(aByte) #show("bRead:",bRead) txtRead=txtRead+bRead IXany=False #IX loop once #end while return txtRead #end def # serRx_pico() receives a full serial text message #serRx_pico() def serRx_pico(uartObject, bps=600, timeOut=-1, eom="]"): uart = machine.UART(1,bps, parity=None, stop=1, bits=8, rx=Pin(5), tx=Pin(4)) #IX print("my uart:",uart) #print("timeOut is set to:",timeOut) if timeOut==-1: t100c = 1000/bps timeOutUse=int(t100c+1.0) #at least 1 sec #show("timeOutUse:",timeOutUse) else: timeOutUse=timeOut #in seconds #end if #timeOutUse=t100c time=utime.time() timeNow=time #show("timeNow:",timeNow) timeFail=timeNow+timeOutUse # defaults to a 3 second TimeOut #print("waiting my:",timeOutUse," seconds for data receipt. . . . .") oP="" # reset input buffer show("time:",time) show("timeFail:",timeFail) while time < timeFail : #keep reading any 1 character packets until timeOut print("458 I should be flashing the green LED while receiving") # I don't know if it is still receiving # it should write the received msg to a file. print("461 any") strA=mpyRead(uart) show("strA:",strA) oP=oP+strA show("465 oP:",oP) #end if time=utime.time() time=time+1 #IX show("time",time) if len(oP)>5 : break #IX break out of while loop #end while #show("oP:",oP) lenOp=len(oP) if oP=="": print("ERROR T in serRx (): timeOut after",timeOutUse,"seconds") lenOp=len(oP) #end if if lenOp > 0: if oP.find("?")==-1: pass # ? not found else: print("WARNING '?' seen in message received.") #end if #end if #print("serRx() received:"+oP+":") #during testing return oP #end def serRx_pico(uart,300,timeOut(in secs)) def show(strA,parmA): # show print(strA,parmA) return #end def ########################################################################################## ########################################################################################## progName="testTxD3_pico.py" print("496 progName:"+progName) import IXuart as uart uart.write("501 zzz") if True: bps=60 tPause=4 txPin=5 rxPin=4 # #print sys info print(os.uname()) #os, utime and machine were imported at top #indicate program started visually (by blinking the green LED) led_onboard = machine.Pin(25, machine.Pin_OUT) IXled_onboard.value(0) # onboard LED OFF for 0.5 sec utime.sleep(0.5) IXled_onboard.value(1) # leaving it ON #create uart object sample code if False : #self.uart = UART(1, 115200, parity=None, stop=1, bits=8, rx=Pin(rxPin), tx=Pin(txPin)) pass #end if #create actual uart object #uart = machine.UART(1,bps, parity=None, stop=1, bits=8, rx=Pin(5), tx=Pin(4)) #IX #print uart info #uart = machine.UART(1) print(uart) # during testing # create uartObject #uart = machine.UART(1,bps, parity=None, stop=1, bits=8, tx=txPin, rx=rxPin) # send a single "A" which is 0x41 ("A"-0x40 * 16 is the packet length which is 16 # initialization is complete #uart.write("A") #every group of messages is preceeded by a single "A" # prior to the "A", the transmitter should send a synchronizing sequence to permit # the (human) receiver to "guess" or "determine" the baud rate (the bps) # hopefully the operator of the transmitter and receiver is the same human. # the serRx_pico should be running in a separate thread. #def serTx_pico(uartObject,bps, txtToBeSent="[UUUUUUUU]",rxtxEndChar="]"): print("547 transmitting Hello msg on Pin(GP4) in 2 16 byte packets at") print("548 60 bps, continuously preceded by an 'A' at 2 sec intervals") print("549 Issue 01: it erroneously skips the 'o' in 'Roy'. Doh!") print("550 Issue 02: it includes the '[16' in the msg len count of 16. Doh!") # [16Hello,JoeMaxR # 1234567890123456 # yBob]]]]]]]]]]]] print("557 Hit ^C to stop!") print() while True : try: #print("558 before write") uart.write("A") #print("566 before write") utime.sleep(tPause) #print("568 before write") serTx_pico(uart,bps, txtToBeSent="Hello,JoeMaxRoyBob") #print( "570 before write") utime.sleep(tPause) #print("572 before except") except: print("574 interrupted by ^c") break finally: print("577 after finally") aReply=input("579 Hit Enter to Continue") #end try #end while if False: serTx_pico(uart,bps, txtToBeSent="H") serTx_pico(uart,bps, txtToBeSent="H") serTx_pico(uart,bps, txtToBeSent="H]") serTx_pico(uart,bps, txtToBeSent="[H]") serTx_pico(uart,bps, txtToBeSent="2.97") serTx_pico(uart,bps, txtToBeSent="99.987654") serTx_pico(uart,bps, txtToBeSent="ADC0:(0x4000,1622428309.000j)") serTx_pico(uart,bps, txtToBeSent="1234567890123") serTx_pico(uart,bps, txtToBeSent="12345678901234") serTx_pico(uart,bps, txtToBeSent="Hello,World") serTx_pico(uart,bps, txtToBeSent="1234567890123456789012345678901234567890") #end if if False: # the above is the current format of the rxtx protocol # The som,eom,t10c and are 1-character special codes # each must be converted into outgoing text or actions or both # all normal text does not need to be converted before transmission # # The txParse routine analyses the msgOut Format and separates it into # special codes (text and actions). Each special code is sent to the txChar() # routine to be processed. Each character of outgoing text is sent to the # txChar() routine to be processed. # # The serTx_pico() routine is used to # transmit a special code of text ("S" and "M") # cause a time delay (V is "t5c X is "t10c " and M is "t1000c") # transmit msg as 16-byte packets (msg should not contain "]" which is replaced by "?" ) # # early testing Format is "SMEPX" # # Note that the "P" also inserts at least 1 "]" character and # enough extra "]" characters (or none) to make a message with an exact multiple # of 16 characters. This is because there are always 16 characters sent per # packet. Each packet is followed by a time delay of t5c. # The last packet is always followed by a time delay of t10c. # These time delays permit the receiver to resync with the transmitter. # These time delays are never included in the character count of the # message # The serRx_pico() routine times out after an interval of 1000 character times: t1000c pass #end if # print whatever has been received bps=600 t100c=1000/bps utime.sleep(t100c) #wait for an interval of 100 char times #end if # This should be run this as a separate thread #uart = machine.UART(1,bps, parity=None, stop=1, bits=8, tx=Pin(txPin), rx=Pin(rxPin)) bps=600 #uart = machine.UART(1,bps, parity=None, stop=1, bits=8, tx=Pin(4), rx=Pin(5)) #uart="machine.uart" #IX print("636 machine.uart:",uart) uart.write("642 yyy") if True: #IX uart.write("hello") t100c=1000/bps utime.sleep(t100c) #wait for an interval of 100 char times uart.write("Hello,World") utime.sleep(t100c) #wait for an interval of 100 char times utime.sleep(1) #1 sec #if end if False: print("receiving mpy routine") while uart.any(): print(uart.read(1)) #end while #end if charIn="" if False: print("receiving mpy routine") if uart.any() : #print("663 reading now") charIn=uart.read(1) #show("charIn:",charIn) #oP=oP+uartObject.read(1) #end if print("end of mpy routine") #end if show("charIn:",charIn) #end if print("receiving my routine") msgIn=serRx_pico(uart,bps=600) #times out after t100c seconds (100 char-times) print("msgIn:",msgIn) print("675 end of serRX()") print("676 end of:",progName) #IX #end /testTxD3_pico.py