20. 다중 상속

21. 추상클래스

22. 예외처리

23. file i/o

24. wxPython

25. SQLite

26. 원격 DB 연동

27. socket

28. Thread

29. 멀티 쓰레드 채팅 프로그램

30. pool

31. process

32. 웹 크롤링

33. HttpServer

34. CGI

35. 챗봇(chatbot)


20. 다중 상속 - 순서 중요

 * test_24

print("이전 작업 진행")
class Tiger:
    data = '호랑이 세상'
    
    def cry(self):
        print('호랑이 울음')
    def eat(self):
        print('고기 먹음')

class Lion:
    data = '사자 세상'
    
    def cry(self):
        print('사자 울음')
        
    def hobby(self):
        print('사자 낮잠')

class Liger(Tiger, Lion): # 먼저 명시한 부모를 먼저 인식
    pass

aa = Liger()
aa.cry()        # 호랑이 울음
aa.eat()        # 고기 먹음
aa.hobby()      # 사자 낮잠
print(aa.data)  # 호랑이 세상
print()

class Liger2(Lion, Tiger):
    data = 'Liger2 멤버 변수'
    
    def play(self):
        print("Liger2 고유 메소드")
        self.hobby()
        super().hobby()
        print(self.data)
        print(super().data)
bb = Liger2()
bb.cry()       # 사자 울음
bb.eat()       # 고기 먹음
bb.hobby()     # 사자 낮잠
print(bb.data) # Liger2 멤버 변수
bb.play()
# Liger2 고유 메소드
# 사자 낮잠
# 사자 낮잠
# Liger2 멤버 변수
# 사자 세상
class Animal:
    def move(self):
        pass
    
class Dog(Animal):
    name = '개'
    
    def move(self):
        print('개가 낮에 돌아다님')
    
class Cat(Animal):
    name = '고양이'
    
    def move(self):
        print('고양이가 밤에 돌아다님')
    
class Wolf(Dog, Cat):
    pass

class Fox(Cat, Dog):
    def move(self):
        print('여우가 돌아다님(오버라이딩)')
    
    def foxMove(self):
        print('여우 고유 메소드')
dog = Dog()
cat = Cat()
wolf = Wolf()
fox = Fox()

anis = [dog, cat, wolf, fox]
for a in anis:
    a.move()
# 개가 낮에 돌아다님
# 고양이가 밤에 돌아다님
# 개가 낮에 돌아다님
# 여우가 돌아다님(오버라이딩)

21. 추상클래스

추상클래스 : 추상 메소드를 가지는 클래스.
추상 메소드: 자식 클래스에서 부모의 추상메소드를 일반 메소드로 반드시 오버라이딩 하도록 강요.

 * test25_abstract

from abc import *

class TestClass(metaclass = ABCMeta): # 추상 클래스
    @abstractmethod
    def abcMethod(self):              # 추상 메소드
        pass
    
    def normalMethod(self):           # 일반 메소드
        print('일반 메소드')

tt = TestClass()   # TypeError: Can't instantiate abstract class TestClass with abstract methods abcMethod 
                   # 추상클래스가 추상메소드를 가질 경우 인스턴스 생성 시 error 발생.
                   # 추상클래스가 추상메소드를 가지지 않을 경우 인스턴스 생성 가능.
class Child1(TestClass): # 추상 클래스를 상속 받을 경우 추상메소드를 오버라이드 하지않을 경우 error 발생
    name = 'Child1'
    
    def abcMethod(self):
        print('Child1 추상 메소드를 오버라이딩하여 일반 메소드 생성')

# TypeError: Can't instantiate abstract class Child1 with abstract methods abcMethod
c1 = Child1()
c1.abcMethod()       # Child1 추상 메소드를 오버라이딩하여 일반 메소드 생성
c1.normalMethod()    # 일반 메소드
class Child2(TestClass):
    name = 'Child2'
    
    def abcMethod(self):
        print('Child2 추상 메소드를 오버라이딩하여 일반 메소드 생성')
        print('다른 클래스의 abcMethod와 이름은 같으나 다른 기능을 수행.')
        
    def normalMethod(self):
        print('부모의 normalMethod를 오버라이딩. 부모 메소드와 다른 역할 수행을 위해.')

c2 = Child2()
c2.abcMethod()      # Child2 추상 메소드를 오버라이딩하여 일반 메소드 생성
                    # 다른 클래스의 abcMethod와 이름은 같으나 다른 기능을 수행.
c2.normalMethod()   # 부모의 normalMethod를 오버라이딩. 부모 메소드와 다른 역할 수행을 위해.
print()

다형성 구현

my = c1
my.abcMethod()      # 1
                    #Child1 추상 메소드를 오버라이딩하여 일반 메소드 생성
my = c2
my.abcMethod()      # 1과 동일한 명령문이나 기능은 다름.
                    #Child2 추상 메소드를 오버라이딩하여 일반 메소드 생성
                    #다른 클래스의 abcMethod와 이름은 같으나 다른 기능을 수행.

클래스 연습문제 24-4

class Employee(metaclass= ABCMeta):
    def __init__(self, irum, nai):
        self.irum = irum
        self.nai = nai
    
    @abstractmethod
    def pay(self):
        pass
    
    @abstractmethod
    def data_print(self):
        pass
    
    def irumnai_print(self):
        print('이름 : ' + self.irum + ', 나이 : '+ str(self.nai), end = ', ')                     
                    
class Temporary(Employee):
    def __init__(self, irum, nai, ilsu, ildang):
        Employee.__init__(self, irum, nai)
        self.ilsu = ilsu
        self.ildang = ildang
        
    def pay(self):
        return self.ilsu * self.ildang; 
    
    def data_print(self):
        self.irumnai_print()
        print('월급 : ' + str(self.pay()))

t = Temporary('홍길도', 25, 20, 15000)
t.data_print() # 이름 : 홍길도, 나이 : 25, 월급 : 300000
class Regular(Employee):
    def __init__(self, irum, nai, salary):
        Employee.__init__(self, irum, nai)
        self.salary = salary
    
    def pay(self): # 추상 메소드 오버라이딩
        return self.salary; 
    
    def data_print(self):
        self.irumnai_print()
        print('급여 : ' + str(self.pay()))

r = Regular('한국인', 27, 3500000) # 이름 : 한국인, 나이 : 27, 급여 : 3500000
r.data_print()
class Salseman(Regular):
    def __init__(self, irum, nai, salary, sales, commission):
        Regular.__init__(self, irum, nai, salary)
        self.sales = sales
        self.commission = commission
    
    def pay(self): # 일반 메소드 오버라이딩
        return super().pay() + self.sales * self.commission; 
    
    def data_print(self):
        self.irumnai_print()
        print('수령액 : ' + str(self.pay()))
        
s = Salseman('손오공', 29, 1200000, 5000000, 0.25)
s.data_print() # 이름 : 손오공, 나이 : 29, 수령액 : 2450000.0

22. 예외처리

 * test26_try

try ~ except - 예상치 않은 에러에 대응하기 위한 문법

def divide(a, b):
    return a / b

c = divide(5, 2)
#c = divide(5, 0) # ZeroDivisionError: division by zero
print(c)
print()

try:
    c = divide(5, 2)
    #c = divide(5, 0)
    print(c)
    
    aa = [1, 2]
    print(aa[0])
    print(aa[1])
    #print(aa[2]) # IndexError: list index out of range
    
    f = open('c:/abc.txt') # FileNotFoundError: [Errno 2] No such file or directory: 'c:/abc.txt'
    
except ZeroDivisionError:
    print('두번째 인자에 0을 주지마세요')

except IndexError as e:
    print('Index Error : ', e)
except Exception as err:
    print("error 발생 : ", err)
finally:
    print('에러 유무에 관계없이 반드시 수행됨')

print('프로그램 종료')

 


23. file i/o

import os
print(os.getcwd())

try:
    print('파일 읽기')
    f1 = open(r'ftest.txt', mode ='r', encoding ='utf-8')
    print(f1) # <_io.TextIOWrapper name='ftest.txt' mode='r' encoding='utf-8'>
    print(f1.read())
    f1.close() # 파일 처리 종료시 close() : 사용했던 자원을 해제
    
    print('파일로 저장')
    f2 = open('ftest2.txt', mode = 'w', encoding = 'utf-8')
    f2.write('내 친구들\n')
    f2.write('홍길동 고길동\n')
    f2.close()
    print('저장 성공')
    
    print('파일에 내용추가')
    f2 = open('ftest2.txt', mode = 'a', encoding = 'utf-8')
    f2.write('손오공\n')
    f2.write('조조\n')
    f2.close()
    print('추가 성공')
    
    print('파일 읽기')
    f3 = open(r'ftest2.txt', mode ='r', encoding ='utf-8')
    print(f3.read()) # 전체 행 읽기
    #print(f3.readline()) # 한 행씩 읽기
    f3.close()
    
except Exception as e:
    print('에러 : ', e)

 - with문 사용 - close() 자동 수행

try:
    # 저장
    with open('ftest3.txt', mode = 'w', encoding = 'utf-8') as ff1:
        ff1.write('파이썬 문서 저장\n')
        ff1.write('내용\n')
        ff1.write('with문 사용으로 close() 하지않음\n')
    # 읽기
    with open('ftest3.txt', mode = 'r', encoding = 'utf-8') as ff2:
        print(ff2.read())
except Exception as e2:
    print('에러 발생', e2)

 - 피클링  : 복합 개체 처리(i/o)

import pickle

try:
    dicData = {'tom':'111-1111', 'james':'222-2222'}
    listData = ['마우스', '키보드']
    tupleData = (dicData, listData) # 복합객체
    
    with open('hi.dat', 'wb') as fobj:
        pickle.dump(tupleData, fobj)
        pickle.dump(listData, fobj)
    print('객체를 파일로 저장하기 성공')
    print('객체 읽기')
    with open('hi.dat', 'rb') as fobj2:
        a, b = pickle.load(fobj2)
        print(a) # {'tom': '111-1111', 'james': '222-2222'}
        print(b) # ['마우스', '키보드']
        c = pickle.load(fobj2)
        print(c)
except Exception as e3:
    print('error : ' + e3)

 * test28_file

 - 동 이름으로 우편번호와 주소 출력

try:
    dong = input('동이름 입력 : ')
    print(dong)
    with open(r'zipcode.txt', mode = 'r', encoding = 'euc-kr') as f:
        #print(f.read())
        line = f.readline()
        while line:
            lines = line.split('\t') # tab으로 구분
            #print(lines)
            if lines[3].startswith(dong): # 입력된 동 이름으로 시작되는 주소만 처리
                print('[' + lines[0] + '] '+ lines[1] + ' '+ lines[2] + \
                      ' '+ lines[3] + ' '+ lines[4])
            line = f.readline() # readline이 없을 때 까지 수행
            
except Exception as e:
    print('err',e)

 


24. wxPython

 - 설치

1 사이트 접속

wxpython.org/Phoenix/snapshot-builds/

 

Index of /Phoenix/snapshot-builds

 

wxpython.org

2 " wxPython-4.1.2a1.dev5103+4ab028d7-cp38-cp38-win_amd64.whl " 다운로드

 

3 anaconda prompt 실행

" cd C:\Users\wonh\Downloads\ " 입력(다운로드 파일 경로)

" dir wx* " 입력 후 하기 파일명 복사

" pip install wxPython-4.1.2a1.dev5103+4ab028d7-cp38-cp38-win_amd64.whl "입력

 

 * test29_gui1

# GUI (윈도우 프로그래밍)
# wxPython

import wx
# app = wx.App(False)
# frame = wx.Frame(None, wx.ID_ANY, 'Hi')
# frame.Show(True)
# app.MainLoop()

class Ex(wx.Frame):
    def __init__(self, parent, title):
        super(Ex, self).__init__(parent, title = title, size =(300, 300))
        
        # textBox 추가
        #self.txtA = wx.TextCtrl(self)
        #self.txtA = wx.TextCtrl(self, style = wx.TE_MULTILINE)
        
        # Label, textBox 사용
        panel = wx.Panel(self)
        wx.StaticText(panel, label = 'i d : ', pos = (5, 5))
        wx.StaticText(panel, label = 'pwd : ', pos = (5, 40))
        self.txtId = wx.TextCtrl(panel, pos = (40, 5))
        self.txtPwd = wx.TextCtrl(panel, pos = (40, 40), size = (200, -1))
        
        # Button : 디자인
        btn1 = wx.Button(panel, label = '버튼1', pos = (5,100))
        btn2 = wx.Button(panel, label = '버튼2', pos = (100,100))
        btn3 = wx.Button(panel, label = '종료', pos = (200,100))
        
        # Button에  대한 이벤트 처리 작업1
#         btn1.Bind(wx.EVT_BUTTON, self.btn1Method)
#         btn2.Bind(wx.EVT_BUTTON, self.btn2Method)
#         btn3.Bind(wx.EVT_BUTTON, self.btn3Method)
        
        # Button에  대한 이벤트 처리 작업2
        btn1.id = 1 # 각 버튼의 구분을 위해 id를 준다
        btn2.id = 2
        btn3.id = 3
        btn1.Bind(wx.EVT_BUTTON, self.OnClickMethod)
        btn2.Bind(wx.EVT_BUTTON, self.OnClickMethod)
        btn3.Bind(wx.EVT_BUTTON, self.OnClickMethod)
        
        self.Center()
        self.Show()
    
    def abc(self):
        print('일반 메소드')
    
    def btn1Method(self, event): # 이벤트 처리 메소드. event handler
        #print('버튼 1을 누르셨습니다')
        temp = self.txtId.GetValue()
        self.txtPwd.SetLabelText(temp)
    
    def btn2Method(self, event): # 이벤트 처리 메소드. event handler
        #print('버튼 2를 누르셨습니다')
        msgDial = wx.MessageDialog(self, '메세지', '제목', wx.OK)
        msgDial.ShowModal()
        msgDial.Destroy()
        
    def btn3Method(self, event): # 이벤트 처리 메소드. event handler
        #print('종료')
        self.Close() # wx.App()를 종료
        
    def OnClickMethod(self, event):
        #print(event.GetEventObject().id)
        #print('aa')
        if event.GetEventObject().id == 1:
            self.txtId.SetLabelText('id');
        elif event.GetEventObject().id == 2:
            self.txtPwd.SetLabelText('password');
        else:
            self.Close() 
        
if __name__ == '__main__':
    app = wx.App()
    Ex(None, title = '연습')
    app.MainLoop()

 * test30_gui2

# 레이아웃 관리자 클래스 중 Boxsizer 연습
import wx

class MyFrame(wx.Frame):
    def __init__(self, parent, title):
        super(MyFrame, self).__init__(parent, title=title, size =(300, 350))
        
        panel1 = wx.Panel(self, -1, style = wx.SUNKEN_BORDER)
        panel2 = wx.Panel(self, -1, style = wx.SUNKEN_BORDER)
        
        panel1.SetBackgroundColour("Blue")
        panel2.SetBackgroundColour("red")
        
        box = wx.BoxSizer(wx.HORIZONTAL) # VERTICAL 
        box.Add(panel1, 1, wx.EXPAND)
        box.Add(panel2, 2, wx.EXPAND)
        
        self.SetSizer(box)
        self.Center()
        self.Show()

if __name__ == '__main__':
    app = wx.App()
    MyFrame(None, title = '레이아웃')
    app.MainLoop()

 

 - wxformbuilder - UI 편집 도구

 * test31_gui3

sourceforge.net/projects/wxformbuilder/

 

wxFormBuilder

Download wxFormBuilder for free. ALL DEVELOPMENT MOVED TO GITHUB https://github.com/wxFormBuilder/wxFormBuilder wxFormBuilder - a RAD tool for wxWidgets GUI design.

sourceforge.net

import wx
import wx.xrc

class MyFrame1 ( wx.Frame ):
    
    def __init__( self, parent ):
        wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = u"연습", pos = wx.DefaultPosition, size = wx.Size( 288,154 ), style = wx.DEFAULT_FRAME_STYLE|wx.TAB_TRAVERSAL )
        
        #self.SetSizeHintsSz( wx.DefaultSize, wx.DefaultSize )
        self.SetBackgroundColour( wx.SystemSettings.GetColour( wx.SYS_COLOUR_MENU ) )
        
        bSizer1 = wx.BoxSizer( wx.VERTICAL )
        
        self.m_panel1 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer2 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.m_staticText1 = wx.StaticText( self.m_panel1, wx.ID_ANY, u"이름 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.m_staticText1.Wrap( -1 )
        bSizer2.Add( self.m_staticText1, 0, wx.ALL, 5 )
        
        self.txtName = wx.TextCtrl( self.m_panel1, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer2.Add( self.txtName, 0, wx.ALL, 5 )
        
        
        self.m_panel1.SetSizer( bSizer2 )
        self.m_panel1.Layout()
        bSizer2.Fit( self.m_panel1 )
        bSizer1.Add( self.m_panel1, 0, wx.ALL|wx.EXPAND, 5 )
        
        self.m_panel2 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer3 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.m_staticText2 = wx.StaticText( self.m_panel2, wx.ID_ANY, u"나이 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.m_staticText2.Wrap( -1 )
        bSizer3.Add( self.m_staticText2, 0, wx.ALL, 5 )
        
        self.txtAge = wx.TextCtrl( self.m_panel2, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer3.Add( self.txtAge, 0, wx.ALL, 5 )
        
        self.btnOk = wx.Button( self.m_panel2, wx.ID_ANY, u"확인", wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer3.Add( self.btnOk, 0, wx.ALL, 5 )
        
        
        self.m_panel2.SetSizer( bSizer3 )
        self.m_panel2.Layout()
        bSizer3.Fit( self.m_panel2 )
        bSizer1.Add( self.m_panel2, 0, wx.EXPAND |wx.ALL, 5 )
        
        self.m_panel3 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer4 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.m_staticText3 = wx.StaticText( self.m_panel3, wx.ID_ANY, u"결과 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.m_staticText3.Wrap( -1 )
        bSizer4.Add( self.m_staticText3, 0, wx.ALL, 5 )
        
        self.staShow = wx.StaticText( self.m_panel3, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
        self.staShow.Wrap( -1 )
        bSizer4.Add( self.staShow, 0, wx.ALL, 5 )
        
        
        self.m_panel3.SetSizer( bSizer4 )
        self.m_panel3.Layout()
        bSizer4.Fit( self.m_panel3 )
        bSizer1.Add( self.m_panel3, 0, wx.EXPAND |wx.ALL, 5 )
        
        
        self.SetSizer( bSizer1 )
        self.Layout()
        
        self.Centre( wx.BOTH )
        
        # Connect Events
        self.btnOk.Bind( wx.EVT_BUTTON, self.OnClickMethod )
    
    def __del__( self ):
        pass
    
    
    # Virtual event handlers, overide them in your derived class
    def OnClickMethod( self, event ):
        #event.Skip()
        name = self.txtName.GetValue()
        if name == "":
            wx.MessageBox('이름 입력', '알림', wx.OK)
            return
        age = self.txtAge.GetValue()
        
        self.staShow.SetLabelText(name+" " + age)
    
if __name__ == '__main__':
    app = wx.App()
    MyFrame1(None).Show()
    app.MainLoop()
    

 

 * test32_gui_calc

import wx
import wx.xrc

class MyFrame1 ( wx.Frame ):
    
    def __init__( self, parent ):
        wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = wx.EmptyString, pos = wx.DefaultPosition, size = wx.Size( 308,303 ), style = wx.DEFAULT_FRAME_STYLE|wx.TAB_TRAVERSAL )
        
        #self.SetSizeHintsSz( wx.DefaultSize, wx.DefaultSize )
        self.SetBackgroundColour( wx.SystemSettings.GetColour( wx.SYS_COLOUR_MENU ) )
        
        bSizer1 = wx.BoxSizer( wx.VERTICAL )
        
        self.m_panel1 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer2 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.m_staticText1 = wx.StaticText( self.m_panel1, wx.ID_ANY, u"숫자1", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.m_staticText1.Wrap( -1 )
        bSizer2.Add( self.m_staticText1, 0, wx.ALL, 5 )
        
        self.txtNum1 = wx.TextCtrl( self.m_panel1, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer2.Add( self.txtNum1, 0, wx.ALL, 5 )
        
        
        self.m_panel1.SetSizer( bSizer2 )
        self.m_panel1.Layout()
        bSizer2.Fit( self.m_panel1 )
        bSizer1.Add( self.m_panel1, 0, wx.EXPAND |wx.ALL, 5 )
        
        self.m_panel2 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer3 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.m_staticText2 = wx.StaticText( self.m_panel2, wx.ID_ANY, u"숫자2", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.m_staticText2.Wrap( -1 )
        bSizer3.Add( self.m_staticText2, 0, wx.ALL, 5 )
        
        self.txtNum2 = wx.TextCtrl( self.m_panel2, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer3.Add( self.txtNum2, 0, wx.ALL, 5 )
        
        
        self.m_panel2.SetSizer( bSizer3 )
        self.m_panel2.Layout()
        bSizer3.Fit( self.m_panel2 )
        bSizer1.Add( self.m_panel2, 0, wx.EXPAND |wx.ALL, 5 )
        
        self.m_panel3 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer4 = wx.BoxSizer( wx.VERTICAL )
        
        rdoOpChoices = [ u"+", u"-", u"*", u"/" ]
        self.rdoOp = wx.RadioBox( self.m_panel3, wx.ID_ANY, u"연산자 선택", wx.DefaultPosition, wx.DefaultSize, rdoOpChoices, 1, wx.RA_SPECIFY_ROWS )
        self.rdoOp.SetSelection( 2 )
        bSizer4.Add( self.rdoOp, 0, wx.ALL|wx.EXPAND, 5 )
        
        
        self.m_panel3.SetSizer( bSizer4 )
        self.m_panel3.Layout()
        bSizer4.Fit( self.m_panel3 )
        bSizer1.Add( self.m_panel3, 0, wx.EXPAND |wx.ALL, 5 )
        
        self.m_panel4 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer5 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.m_staticText3 = wx.StaticText( self.m_panel4, wx.ID_ANY, u"연산결과 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.m_staticText3.Wrap( -1 )
        bSizer5.Add( self.m_staticText3, 0, wx.ALL, 5 )
        
        self.staResult = wx.StaticText( self.m_panel4, wx.ID_ANY, u"0", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.staResult.Wrap( -1 )
        bSizer5.Add( self.staResult, 0, wx.ALL, 5 )
        
        
        self.m_panel4.SetSizer( bSizer5 )
        self.m_panel4.Layout()
        bSizer5.Fit( self.m_panel4 )
        bSizer1.Add( self.m_panel4, 0, wx.EXPAND |wx.ALL, 5 )
        
        self.m_panel5 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer6 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.btnCalc = wx.Button( self.m_panel5, wx.ID_ANY, u"계산", wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer6.Add( self.btnCalc, 0, wx.ALL, 5 )
        
        self.btnClear = wx.Button( self.m_panel5, wx.ID_ANY, u"초기화", wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer6.Add( self.btnClear, 0, wx.ALL, 5 )
        
        self.btnExit = wx.Button( self.m_panel5, wx.ID_ANY, u"종료", wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer6.Add( self.btnExit, 0, wx.ALL, 5 )
        
        
        self.m_panel5.SetSizer( bSizer6 )
        self.m_panel5.Layout()
        bSizer6.Fit( self.m_panel5 )
        bSizer1.Add( self.m_panel5, 1, wx.EXPAND |wx.ALL, 5 )
        
        
        self.SetSizer( bSizer1 )
        self.Layout()
        
        self.Centre( wx.BOTH )
        # Connect Events
        self.btnCalc.id  = 1 # id 부여
        self.btnClear.id = 2
        self.btnExit.id  = 3
        self.btnCalc.Bind( wx.EVT_BUTTON, self.OnCalcProcess )
        self.btnClear.Bind( wx.EVT_BUTTON, self.OnCalcProcess )
        self.btnExit.Bind( wx.EVT_BUTTON, self.OnCalcProcess )
    
    def __del__( self ):
        pass
    
    
    # Virtual event handlers, overide them in your derived class
    def OnCalcProcess( self, event ):
        sel_id = event.GetEventObject().id # event 발생 객체의 id를 get
        #print(sel_id)
        if sel_id == 1: # 계산 버튼을 누를 경우
            op = self.rdoOp.GetStringSelection() # 선택된 라디오 버튼의 문자열 get  
            #print(op)
            num1 = self.txtNum1.GetValue()
            num2 = self.txtNum2.GetValue()
            
            if num1 == '' or num2 == '':
                wx.MessageBox('값을 입력하시오', '알림', wx.OK)
                return
            try:
                result = eval(num1 + op + num2)
            except Exception as e:
                wx.MessageBox('연산오류', '알림', wx.OK)
                return
            
            self.staResult.SetLabel(str(result))
        elif sel_id == 2: # 초기화
            self.txtNum1.SetLabel('')
            self.txtNum2.SetLabel('')
            self.staResult.SetLabel('')
            self.rdoOp.SetSelection(0)
            self.txtNum1.SetFocus()     # UX/UI
        elif sel_id == 3:
            dlg = wx.MessageDialog(self, '정말 종료할까요?', '알림', wx.YES_NO) # MessageDialog style 입력
            temp = dlg.ShowModal() # MessageDialog 창 발생 
            if temp == wx.ID_YES:  # YES 버튼을 누를 경우
                dlg.Destroy()      # MessageDialog창 닫기 
                self.Close()       # Frame 닫기
        
if __name__ == '__main__':
    app = wx.App()
    MyFrame1(None).Show()
    app.MainLoop()


25. SQLite

 : 개인용 DB SQLite3 : Python에 기본 설치됨

 : 환경변수 path에 추가 : C:\anaconda3\Library\bin

 * test33_sqlite

import sqlite3
print(sqlite3.version_info)

#conn = sqlite3.connect('example.db')
conn = sqlite3.connect(':memory:') # ram에 DB 생성 - 테스트용으로 연습할 때

try:
    cur = conn.cursor()
    
    cur.execute("create table if not exists friends(name text, phone text)")
    
    cur.execute("insert into friends values('홍길동', '111-1111')")
    cur.execute("insert into friends values(?, ?)", ('tom', '222-2222'))
    conn.commit()
    cur.execute("select * from friends")
    for f in cur:
        print(f[0] + " " + f[1])
    
except Exception as e:
    print(e)
    conn.rollback()
finally:
    conn.close()

26. 원격 DB 연동

 : maria db

 : anaconda prompt : pip install mysqlclient 입력

 * test34_db

import MySQLdb

conn = MySQLdb.connect(host = '127.0.0.1', user = 'root', password='123', database='test')
print(conn)
conn.close

sangdata table 자료 읽기

import MySQLdb

config = {
    'host':'127.0.0.1',
    'user':'root',
    'password':'123',
    'database':'test',
    'port':3306,
    'charset':'utf8',
    'use_unicode':True
}
try:
    conn = MySQLdb.connect(**config)
    cursor = conn.cursor() # sql문 처리용
    
    #자료추가1
    sql = "insert into sangdata(code, sang, su, dan) values(10, '새우깡', 5, 1200)" 
    cursor.execute(sql)
    conn.commit()
    sql = "insert into sangdata values(%s, %s, %s, %s)"
    #sql_data = ('12', '감자깡', 10, 2000)
    sql_data = '12', '감자깡', 10, 2000
    re = cursor.execute(sql, sql_data) # 성공하면 1, 실패시 0
    print(cursor)
    print(re)
    conn.commit()
    # 자료 수정
    sql = "update sangdata set sang=%s, su=%s, dan=%s where code=%s"
    sql_data = '깡', 7, 1000, 12
    cou = cursor.execute(sql, sql_data) # 성공하면 n, 실패시 0
    print('성공건수 ',cou)
    conn.commit()
    # 자료삭제
    code = '10'
    sql = "delete from sangdata where code =" + code # 비권장
    cursor.execute(sql)
    sql = "delete from sangdata where code = %s" # 권장 1
    cursor.execute(sql, (code,)) # 튜플
    sql = "delete from sangdata where code = '{0}'".format(code) # 권장 2
    cou = cursor.execute(sql)
    print('성공건수 ',cou)
    conn.commit()
    # 자료읽기 1
    sql = "select code, sang, su, dan from sangdata"
    cursor.execute(sql)    # DB의 자료를 읽어 cursor 객체가 그 결과를 기억
    
    for data in cursor.fetchall():
        #print(data)
        print('%s %s %s %s'%data)
    # 자료읽기 2
    for r in cursor:
        print(r[0], r[1], r[2], r[3])
    # 자료읽기 3
    for (code, sang, su, dan) in cursor:
        print(code, sang, su, dan)
    
    for (a, b, c, d) in cursor:
        print(a, b, c, d)
except MySQLdb.connections.Error as err:
    print('db err' + str(err))
except Exception as e:
    print('err' + str(e))
finally:
    cursor.close()
    conn.close()

 

 - 키보드로 부서번호를 입력받아 해당 부서에 근무하는 직원 출력

 * test35_db

import MySQLdb
import sys

config = {
    'host':'127.0.0.1',
    'user':'root',
    'password':'123',
    'database':'test',
    'port':3306,
    'charset':'utf8',
    'use_unicode':True
}

try:
    conn = MySQLdb.connect(**config)
    cursor = conn.cursor()
    
    buser_no = input('부서번호 입력 : ')
    sql = """
    select jikwon_no, jikwon_name, buser_num, jikwon_jik, jikwon_pay
    from jikwon
    where buser_num ={0}
    """.format(buser_no)
    
    cursor.execute(sql)
    datas = cursor.fetchall()
    
    if len(datas) == 0:
        print(str(buser_no)+ '번 부서는 없습니다.')
        sys.exit()
        
    for d in datas:
        print(d[0], d[1], d[2], d[3])
        
    print('인원수 : ' + str(len(datas)))
    
except Exception as e:
    print('err', e)
    
finally:
    cursor.close()
    conn.close()

 * test36_db_gui

config = {
    'host':'127.0.0.1',
    'user':'root',
    'password':'123',
    'database':'test',
    'port':3306,
    'charset':'utf8',
    'use_unicode':True
}

 - 파일 분리

import ast

with open('mariadb.txt', mode = 'r') as f:
    #print(f.read())
    aa = f.read()
    config = ast.literal_eval(aa)
import wx
import wx.xrc
import MySQLdb

class MyGogek ( wx.Frame ):
    
    def __init__( self, parent ):
        wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = u"고객자료 보기", pos = wx.DefaultPosition, size = wx.Size( 500,336 ), style = wx.DEFAULT_FRAME_STYLE|wx.TAB_TRAVERSAL )
        
        #self.SetSizeHintsSz( wx.DefaultSize, wx.DefaultSize )
        self.SetBackgroundColour( wx.SystemSettings.GetColour( wx.SYS_COLOUR_MENU ) )
        
        bSizer1 = wx.BoxSizer( wx.VERTICAL )
        
        self.m_panel1 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer2 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.m_staticText1 = wx.StaticText( self.m_panel1, wx.ID_ANY, u"사번 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.m_staticText1.Wrap( -1 )
        bSizer2.Add( self.m_staticText1, 0, wx.ALL, 5 )
        
        self.txtNo = wx.TextCtrl( self.m_panel1, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer2.Add( self.txtNo, 0, wx.ALL, 5 )
        
        self.m_staticText2 = wx.StaticText( self.m_panel1, wx.ID_ANY, u"직원명", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.m_staticText2.Wrap( -1 )
        bSizer2.Add( self.m_staticText2, 0, wx.ALL, 5 )
        
        self.txtName = wx.TextCtrl( self.m_panel1, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer2.Add( self.txtName, 0, wx.ALL, 5 )
        
        self.btnLogin = wx.Button( self.m_panel1, wx.ID_ANY, u"로그인", wx.DefaultPosition, wx.DefaultSize, 0 )
        bSizer2.Add( self.btnLogin, 0, wx.ALL, 5 )
        
        
        self.m_panel1.SetSizer( bSizer2 )
        self.m_panel1.Layout()
        bSizer2.Fit( self.m_panel1 )
        bSizer1.Add( self.m_panel1, 0, wx.EXPAND |wx.ALL, 5 )
        
        self.m_panel2 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer3 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.staMsg = wx.StaticText( self.m_panel2, wx.ID_ANY, u"고객목록", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.staMsg.Wrap( -1 )
        bSizer3.Add( self.staMsg, 0, wx.ALL, 5 )
        
        
        self.m_panel2.SetSizer( bSizer3 )
        self.m_panel2.Layout()
        bSizer3.Fit( self.m_panel2 )
        bSizer1.Add( self.m_panel2, 0, wx.EXPAND |wx.ALL, 5 )
        
        self.m_panel3 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer4 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.lstGogek = wx.ListCtrl( self.m_panel3, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.LC_REPORT )
        bSizer4.Add( self.lstGogek, 1, wx.ALL|wx.EXPAND, 5 )
        
        
        self.m_panel3.SetSizer( bSizer4 )
        self.m_panel3.Layout()
        bSizer4.Fit( self.m_panel3 )
        bSizer1.Add( self.m_panel3, 1, wx.EXPAND |wx.ALL, 5 )
        
        self.m_panel4 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
        bSizer5 = wx.BoxSizer( wx.HORIZONTAL )
        
        self.staCount = wx.StaticText( self.m_panel4, wx.ID_ANY, u"인원수", wx.DefaultPosition, wx.DefaultSize, 0 )
        self.staCount.Wrap( -1 )
        bSizer5.Add( self.staCount, 0, wx.ALL, 5 )
        
        
        self.m_panel4.SetSizer( bSizer5 )
        self.m_panel4.Layout()
        bSizer5.Fit( self.m_panel4 )
        bSizer1.Add( self.m_panel4, 1, wx.EXPAND |wx.ALL, 5 )
        
        
        self.SetSizer( bSizer1 )
        self.Layout()
        
        self.Centre( wx.BOTH )
        
        # lstGogek 객체에 제목 작성
        self.lstGogek.InsertColumn(0, '고객번호', width=100)
        self.lstGogek.InsertColumn(1, '고객명', width=150)
        self.lstGogek.InsertColumn(2, '고객전화', width=200)
        
        # Connect Events
        self.btnLogin.Bind( wx.EVT_BUTTON, self.OnLogin )
    
    def __del__( self ):
        pass
    # Virtual event handlers, overide them in your derived class
    def OnLogin( self, event ):
        if self.txtNo.GetValue() == '': # 사번을 입력안할 경우 팝업 발생
            wx.MessageBox('사번입력', '알림', wx.OK)
            self.txtNo.SetFocus()
            return
        
        if self.txtName.GetValue() == '': # 사번을 입력안할 경우 팝업 발생
            wx.MessageBox('직원명 입력', '알림', wx.OK)
            self.txtNo.SetFocus()
            return
        
        self.LoginCheck()
    def LoginCheck(self):
        try:
            conn = MySQLdb.connect(**config)
            cursor = conn.cursor()
            no = self.txtNo.GetValue()
            name = self.txtName.GetValue()
            #print(no, name)
            
            sql ="""
            select count(*) from jikwon
            where jikwon_no ='{0}' and jikwon_name ='{1}'
            """.format(no, name)
            
            cou = cursor.execute(sql)
            count = cursor.fetchone()[0]
            #print(count) # 1
            if count == 0:
                wx.MessageBox('로그인 실패', '알림', wx.OK)
                return
            else:
                self.staMsg.SetLabelText(no + '번 직원의 관리 고객목록')
                self.DisplayData(no) # 직원자료 출력 메소드 호출
            
        except Exception as e:
            print('LoginCheck err',e)
        finally:
            cursor.close()
            conn.close()
    def DisplayData(self, no):
        try:
            conn = MySQLdb.connect(**config)
            cursor = conn.cursor()
            
            sql = """
            select gogek_no, gogek_name, gogek_tel
            from gogek
            where gogek_damsano = {}
            """.format(no)
            
            cou = cursor.execute(sql)
            datas = cursor.fetchall()
            #print(datas)
            self.lstGogek.DeleteAllItems() # list control 초기화
            for d in datas:
                i = self.lstGogek.InsertItem(1000, 0) # List Control 최대 행수 입력
                self.lstGogek.SetItem(i, 0, str(d[0]))
                self.lstGogek.SetItem(i, 1, d[1])
                self.lstGogek.SetItem(i, 2, d[2])
            self.staCount.SetLabelText('인원수 : ' + str(len(datas)))
        except Exception as e:
            print('DisplayData err',e)
        finally:
            cursor.close()
            conn.close()
            
            
if __name__ == '__main__':
    app = wx.App()
    MyGogek(None).Show()
    app.MainLoop()
    

 - mariadb.txt

{
    'host':'127.0.0.1',
    'user':'root',
    'password':'123',
    'database':'test',
    'port':3306,
    'charset':'utf8',
    'use_unicode':True
}

27. socket

 - Socket : TCP/IP Protocol(통신규약)의 프로그래머 인터페이스를 모듈로 지원

 

 * socket_test

import socket

print(socket.getservbyname('http','tcp'))   # http port 80
print(socket.getservbyname('telnet','tcp')) # http port 23
print(socket.getservbyname('ftp','tcp'))    # http port 21
print(socket.getservbyname('smtp','tcp'))   # http port 25
print(socket.getservbyname('pop3','tcp'))   # http port 110

print()
print(socket.getaddrinfo('www.naver.com', 80, proto=socket.SOL_TCP))
# [(<AddressFamily.AF_INET: 2>, 0, 6, '', ('125.209.222.142', 80)), (<AddressFamily.AF_INET: 2>, 0, 6, '', ('223.130.195.200', 80))]
print(socket.getaddrinfo('www.daum.net', 80, proto=socket.SOL_TCP))

 

 * soc1_server

# 접속상태 확인을 위한 단순 Echo Server - client의 요청에 1회만 반응

from socket import *

serverSock = socket(AF_INET, SOCK_STREAM) # socket(socket 종류, socket 유형)
serverSock.bind(('127.0.0.1', 9999))
serverSock.listen(1)                      # TCP 리스너 설정

print('server start')

conn, addr = serverSock.accept()
print('client addr :', addr)
print('from client message : ', conn.recv(1024).decode())
# 클라이언트로 부터 바이트 단위 문자열로 수신된 내용 출력
conn.close()
serverSock.close()

 

 

 * soc1_client

# 1회 접속용 클라이언트
from socket import *

clientSock = socket(AF_INET, SOCK_STREAM)
clientSock.connect(('127.0.0.1', 9999))
clientSock.sendall('안녕 반가워'.encode(encoding='utf_8', errors='strict'))
clientSock.close()

 * soc2_server

# 서버 서비스 계속 유지
import socket
import sys

HOST = ''
PORT = 8888

serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:
    serverSock.bind((HOST, PORT))
    print('server start')
    serverSock.listen(5) # 클라이언트 접속 대기. 최대 동시 접속수는 1 ~ 5
    
    while True:
        conn,addr = serverSock.accept() # 클라이언트 접속 승인
        print('client info : ', addr[0], addr[1])
        print(conn.recv(1024).decode()) # 클라이언트가 전송한 메세지 수신 후 출력
        
        # 전송
        conn.send('from server : ' + str(addr[1]) + 'ㅁㅁㅁ'.encode('utf_8'))
except socket.error as err:
    print('socket error : ', err)
except Exception as e:
    print('error : ', e)
finally:
    serverSock.close()
    

 

 * soc2_client

# 1회 접속용 클라이언트
from socket import *

clientSock = socket(AF_INET, SOCK_STREAM)
clientSock.connect(('127.0.0.1', 8888))
clientSock.sendall('안녕 반가워'.encode(encoding='utf_8'))
re_msg = clientSock.recv(1024).decode()
print('수신자료', re_msg)
clientSock.close()

28. Thread

 - process : 실행 중인 응용프로그램을 의미, 프로세스 단위로 별도의 메모리를 사용

from subprocess import *

Popen('calc.exe')
Popen('notepad.exe')

 - thread : Light weight process 라고도 함. 메인 프로세스(스레드)와 병렬적으로 수행되는 단위 프로그램. 스레드 단위로 함수나 메소드를 수행 가능.

 

 * th1

import time

def run(id):
    for i in range(1, 11):
        print('id:{} -> {}'.format(id, i))
        time.sleep(0.5)

  1. thread를 사용하지 않은 경우

run(1) # 순차적으로 호출되므로 순차적으로 처리됨
run(2)

  2. thread를 사용한 경우 : 스레드 스케쥴러에 의해 랜덤하게 스레드 처리가 됨

import threading

th1 = threading.Thread(target=run, args = ('일')) # 사용자 정의 스레드 생성
th2 = threading.Thread(target=run, args = ('이'))
th1.start() # 스레드 수행 시작
th2.start()
th1.join()  # 사용자 정의 스레드가 종료될때까지 메인 스레드 대기 시킴 
th2.join()

print('메인 프로그램 종료') # 메인 스레드에 의해 메인 모듈이 실행(기본 값)

 * th2

스레드를 이용하여 날짜 및 시간 출력

import time
import threading

now = time.localtime()
#print(now)
print('현재는 {0}년 {1}월 {2} 일 {3} 시 {4} 분 {5} 초'.format(now.tm_year, now.tm_mon, \
                                                     now.tm_mday ,now.tm_hour,now.tm_min,now.tm_sec))

def cal_show():
        now = time.localtime()
        print('현재는 {0}년 {1}월 {2} 일 {3} 시 {4} 분 {5} 초'.format(now.tm_year, now.tm_mon, \
                                                     now.tm_mday ,now.tm_hour,now.tm_min,now.tm_sec))

def myRun():
    while True:
        now2 = time.localtime()
        if now2.tm_min == 57:
            break
        cal_show()
        time.sleep(1)
        
th = threading.Thread(target=myRun)
th.start()
th.join()
class MyThread(threading.Thread):
    def run(self):
        for i in range(1, 10):
            print('id{} --> {}'.format(self.getName(), i))
            time.sleep(0.1)
            
ths = []
for i in range(2):
    th = MyThread()
    th.start()
    ths.append(th)
    
for th in ths:
    th.join()
    
print('프로그램 종료')

 - 스레드간 공유자원 값 충돌 방지 - 동기화

 * th3

import threading, time
g_count = 0 # 전역변수는 자동으로 스레드의 공유자원이 됨

lock = threading.Lock()

def threadCount(id, count):
    global g_count
    for i in range(count):
        lock.acquire() # 두 개 이상의 스레드 공유 자원 충돌방지. lock을 걸어 다른 스레드 수행대기.
        print('id %s => count : %s, g_count : %s'%(id, i, g_count))
        g_count += 1
        lock.release() # lock을 해제

for i in range(1, 6):
    threading.Thread(target=threadCount, args =(i, 5)).start()

time.sleep(2)
print('final g_count : ', g_count)
print('finish process')

 - 스레드의 활성화/비활성화

 * th4

import threading, time
bread_plate = 0 # 빵 접시 - 공유자원
lock = threading.Condition()

class Maker(threading.Thread): # 빵 생성자
    def run(self):
        global bread_plate
        
        for i in range(30):
            lock.acquire() # 공유자운 충동방지
            
            while bread_plate >=10:
                print('빵 생산 초과로 대기')
                lock.wait() # 쓰레드 비활성화
                
            bread_plate += 1
            print('빵 생산 후 접시에 담기 : ', bread_plate)
            
            lock.notify() # 쓰레드 활성화
            lock.release()
            time.sleep(0.05)
            
class Eater(threading.Thread): # 빵 소비자
    def run(self):
        global bread_plate
        
        for i in range(30):
            lock.acquire() # 공유자원 충동방지
            
            while bread_plate <1:
                print('빵이 없어 대기')
                lock.wait() # 쓰레드 비활성화
                
            bread_plate -= 1
            print('빵 소비 후 접시의 빵 수 : ', bread_plate)
            
            lock.notify() # 쓰레드 활성화
            lock.release()
            time.sleep(0.07)
mak = []
con = []
for i in range(5): # 빵 생산자 수
    mak.append(Maker())
    
for i in range(5): # 빵 소비자 수
    con.append(Eater())
    
for th1 in mak:
    th1.start()
    
for th2 in con:
    th2.start()
    
for th1 in mak:
    th1.join()
    
for th2 in con:
    th2.join()
    
print('process 끝')

29. 멀티 쓰레드 채팅 프로그램

 : socket, thread 사용

 * chatServer (서버)

import socket
import threading

ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 서버 소켓 생성
ss.bind(('127.0.0.1', 5000))                           # ip에 연결
ss.listen(5)                                           # 동접속자 수 최대 5로 리스너 설정
print('Chatting start')
users = []

def ChatUser(conn):
    name = conn.recv(1024)
    data = '[알림]' + name.decode('utf_8') + '님이 접속하셨습니다.'
    print(data)
    
    try:
        for p in users:                                # 다른 클라이언트에도 접속 메시지 send
            p.send(data.encode('utf_8'))
            
        while True:                                    # 채팅 진행
            msg = conn.recv(1024)                      # 수신한 메세지
            data = '('+name.decode('utf_8') + ') : ' + msg.decode() # 송부자 + 메세지
            print('[R] '+data)
            for p in users:                            # 다른 클라이언트에도 메시지 send
                p.send(data.encode('utf_8'))
    except Exception as e:                             # 접속 종료 시
        users.remove(conn)                             # 접속 종료 시 list에 conn 제거
        data = '[알림]' + name.decode('utf_8') + '님이 접속을 종료하셨습니다.'
        print(data)
        print(e)
        if users:                                      # 다른 클라이언트에도 접속 종료 메시지 send
            for p in users:
                p.send(data.encode('utf_8'))
        else:
            print('exit')
    
while True:
    conn, addr = ss.accept()                           # 소켓 접속 및 대기. 클라이언트 접속 시 값 리턴
    users.append(conn)                                 # 접속 시 list에 conn 추가
    th = threading.Thread(target=ChatUser, args = (conn, )) # 접속자 수 만큼 쓰레드 생성
    th.start()

 

 * chatClient (클라이언트)

import socket
import threading
import sys
def Handle(socket):
    while True:
        data = socket.recv(1024)
        if not data: continue
        print(data.decode('utf_8'))                    # 파이썬 표준 출력은 버퍼링 된다.
        #print(data.decode('utf_8', flush=true))
        
sys.stdout.flush()                                     # 표준 입출력 버퍼 비움

cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 클라이언트 소켓 생성
cs.connect(('127.0.0.1', 5000))                        # 해당 ip에 접속

name = input('채팅 아이디 입력 : ')
cs.send(name.encode('utf_8'))                          # 접속 id send

th = threading.Thread(target=Handle, args = (cs,))     # 쓰레드 생성
th.start()

while True:
    msg = input()                                      # 채팅 메세지 입력
    sys.stdout.flush()
    
    if not msg:continue
    cs.send(msg.encode('utf_8'))                       # msg가 있을 경우 msg send

cs.close()

 


30. pool

GIL(Global Interpreter Lock) 파이썬 인터프리터가 한 스레드만 하나의 바이트코드를 실행 시킬 수 있도록 해주는 Lock
하나의 스레드에 모든 자원을 허락하고 그 후에는 Lock을 걸어 다른 스레드는 실행할 수 없게 막아버린다.
그런 이유로 구조적으로 충돌이 발생하는 경우가 발생한다.
이에 개선사항으로 멀티 프로세싱 모듈 지원한다.

Pool : 압력 값에 대해 process들을 건너건너 분배하여 함수 실행을 병렬하는 방법
PID : process id

 

 * 07_multi_pool

from multiprocessing import Pool
import time 
import os
def f(x):
    print('값', x, '에 대한 작업 pid', os.getpid()) # 현재 진행 중인 process의 processId를 반환
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    p = Pool(3) # pool 객체 생성. 프로세스 수 3 ~ 5 권장
    startTime = int(time.time())
    
    
    for i in range(0, 10): # 0 ~ 9
        print(f(i)) # 10
    
    # 값 0 에 대한 작업 pid 75580
    # 0
    # 값 1 에 대한 작업 pid 75580
    # 1
    # 값 2 에 대한 작업 pid 75580
    # 4
    # 값 3 에 대한 작업 pid 75580
    # 9
    # 값 4 에 대한 작업 pid 75580
    # 16
    # 값 5 에 대한 작업 pid 75580
    # 25
    # 값 6 에 대한 작업 pid 75580
    # 36
    # 값 7 에 대한 작업 pid 75580
    # 49
    # 값 8 에 대한 작업 pid 75580
    # 64
    # 값 9 에 대한 작업 pid 75580
    # 81
    # 총 작업 시간 :  10
    
    print(p.map(f, range(0, 10))) # 함수와 인자값을 매핑하면서 데이터를 분배처리
    # 값 0 에 대한 작업 pid 75712
    # 값 1 에 대한 작업 pid 75604
    # 값 2 에 대한 작업 pid 75540
    # 값 3 에 대한 작업 pid 75712
    # 값 4 에 대한 작업 pid 75604
    # 값 5 에 대한 작업 pid 75540
    # 값 6 에 대한 작업 pid 75712
    # 값 7 에 대한 작업 pid 75604
    # 값 8 에 대한 작업 pid 75540
    # 값 9 에 대한 작업 pid 75712
    # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    # 총 작업 시간 :  5
    endTime = int(time.time())
    print('총 작업 시간 : ', (endTime - startTime))

 


31. process

멀티 프로세실을 위한 Process 클래스
하나의 프로세스를 하나의 함수에 적당한 인자값을 할당해주고 (없을 수도 있음) 진행

 

 * 08_multi_process

import time
import os
from multiprocessing import Process

def func():
    print('연속 진행하고자 하는 어떤 작업')
    #time.sleep(1)

def doubler(number):
    result = number + 10
    func()
    proc = os.getpid()
    print('number : {0}, number : {1},  process id : {2}'.format(number, result, proc))
    
if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    procs = []
    
    for index, number in enumerate(numbers):
        proc = Process(target = doubler, args = (number, ))
        procs.append(proc) # Process에 join() 추가할 의도
        proc.start() # doubler 함수가 호출
        
    for proc in procs:
        proc.join()
# 연속 진행하고자 하는 어떤 작업
# number : 1, number : 11,  process id : 11880
# 연속 진행하고자 하는 어떤 작업
# number : 2, number : 12,  process id : 59068
# 연속 진행하고자 하는 어떤 작업
# number : 3, number : 13,  process id : 55000
# 연속 진행하고자 하는 어떤 작업
# number : 4, number : 14,  process id : 74932
# 연속 진행하고자 하는 어떤 작업
# number : 5, number : 15,  process id : 55096

32. 웹 크롤링

멀티 프로세싱을 통한 웹 크롤링 연습 1 - 멀티 프로세싱 없이 작업

 * 09_multi_web1

import requests
from bs4 import BeautifulSoup as bs
# html, xml 지원, json 지원x
import time

def get_links(): # 해당 컨텐츠의 a tag 얻기
    data = requests.get("https://beomi.github.io/beomi.github.io_old/").text
    soup = bs(data, 'html.parser')
    my_titles = soup.select('h3 > a')
    data = []
    
    for title in my_titles:
        data.append(title.get('href')) # a tag의 속성 중 href 값 반환
    
    return data

def get_content(link):
    abs_link = "https://beomi.github.io" + link
    req = requests.get(abs_link)
    html = req.text
    soup = bs(html, 'html.parser')
    print(soup.select('h1')[0].text) # 첫번째 h1 tag의 문자열 출력    
    
if __name__ == '__main__':
    #print(get_links())
    #print(len(get_links())) # 26
    start_time = time.time()
    for link in get_links():
        get_content(link)
        
    end_time = time.time()
    print('소요시간 : %s 초'%(end_time- start_time))
    
# 나만의 웹 크롤러 만들기(4): Django로 크롤링한 데이터 저장하기
# 나만의 웹 크롤러 만들기(3): Selenium으로 무적 크롤러 만들기
# Django에 Social Login 붙이기: Django세팅부터 Facebook/Google 개발 설정까지
# Django에 Custom인증 붙이기
# 나만의 웹 크롤러 만들기(2): Login with Session
# 나만의 웹 크롤러 만들기 with Requests/BeautifulSoup
# Celery로 TelegramBot 알림 보내기
# Virtualenv/VirtualenvWrapper OS별 설치&이용법
# [DjangoTDDStudy] #02: UnitTest 이용해 기능 테스트 하기
# [DjangoTDDStudy] #01: 개발환경 세팅하기(Selenium / ChromeDriver)
# [DjangoTDDStudy] #00: 스터디를 시작하며
# Fabric Put 커맨드가 No Such File Exception을 반환할 때 해결법
# CKEditor의 라이센스와 오픈소스 라이센스
# ReactNative The Basis 번역을 끝냈습니다.
# [React Native 번역]#01: 시작하기
# [번역] 장고(Django)와 함께하는 Celery 첫걸음
# Chrome Native Adblockr 대체하기
# CustoMac 설치 분투기
# Ubuntu14.04에 OhMyZsh 설치
# Ubuntu14.04에서 pip로 mysqlclient 설치 실패시
# Ubuntu14.04에서 Python3기반 virtualenvwrapper 설치
# mac OS X에서 pip virtualenvwrapper 설치 시 uninstalling six 에서 Exception 발생 시
# Fabric for Python3 (Fabric3)
# Windows에서 pip로 mysqlclient 설치 실패시(python3.4/3.5)
# 맥에서 윈도RDP로 접속시 한영전환하기.
# pip로 mysqlclient설치 중 mac os x에서 egg_info / OSError 발생시 대처방법
# 소요시간 : 7.727251768112183 초

 

멀티 프로세싱을 통한 웹 크롤링 연습 2 - 멀티 프로세싱 사용

 * 10_multi_web2

import requests
from bs4 import BeautifulSoup as bs
import time
from multiprocessing import Pool

def get_links(): # 해당 컨텐츠의 a tag 얻기
    data = requests.get("https://beomi.github.io/beomi.github.io_old/").text
    soup = bs(data, 'html.parser')
    my_titles = soup.select('h3 > a')
    data = []
    
    for title in my_titles:
        data.append(title.get('href')) # a tag의 속성 중 href 값 반환
    
    return data

def get_content(link):
    abs_link = "https://beomi.github.io" + link
    req = requests.get(abs_link)
    html = req.text
    soup = bs(html, 'html.parser')
    print(soup.select('h1')[0].text) # 첫번째 h1 tag의 문자열 출력    
    
if __name__ == '__main__':
    start_time = time.time()
#     for link in get_links():
#         get_content(link)
    
    pool = Pool(processes = 4)
    pool.map(get_content, get_links())
        
    end_time = time.time()
    print('소요시간 : %s 초'%(end_time- start_time))
    
# Django에 Social Login 붙이기: Django세팅부터 Facebook/Google 개발 설정까지
# 나만의 웹 크롤러 만들기(4): Django로 크롤링한 데이터 저장하기
# Celery로 TelegramBot 알림 보내기
# 나만의 웹 크롤러 만들기(2): Login with Session
# 나만의 웹 크롤러 만들기(3): Selenium으로 무적 크롤러 만들기
# Django에 Custom인증 붙이기
# 나만의 웹 크롤러 만들기 with Requests/BeautifulSoup
# Virtualenv/VirtualenvWrapper OS별 설치&이용법
# [DjangoTDDStudy] #00: 스터디를 시작하며
# [DjangoTDDStudy] #02: UnitTest 이용해 기능 테스트 하기
# CKEditor의 라이센스와 오픈소스 라이센스
# [React Native 번역]#01: 시작하기
# Fabric Put 커맨드가 No Such File Exception을 반환할 때 해결법
# [DjangoTDDStudy] #01: 개발환경 세팅하기(Selenium / ChromeDriver)
# ReactNative The Basis 번역을 끝냈습니다.
# [번역] 장고(Django)와 함께하는 Celery 첫걸음
# Ubuntu14.04에 OhMyZsh 설치
# Chrome Native Adblockr 대체하기
# Ubuntu14.04에서 Python3기반 virtualenvwrapper 설치
# Fabric for Python3 (Fabric3)
# Ubuntu14.04에서 pip로 mysqlclient 설치 실패시
# CustoMac 설치 분투기
# mac OS X에서 pip virtualenvwrapper 설치 시 uninstalling six 에서 Exception 발생 시
# Windows에서 pip로 mysqlclient 설치 실패시(python3.4/3.5)
# 맥에서 윈도RDP로 접속시 한영전환하기.
# pip로 mysqlclient설치 중 mac os x에서 egg_info / OSError 발생시 대처방법
# 소요시간 : 3.3195197582244873 초

33. HttpServer

HttpServer 구축용 클래스를 이용해 웹서버 서비스 하기
HTTPServer : 기본적인 socket 연결을 관리하는 클래스
SimpleHTTPRequestHandler : get, head 요청만 처리가능

 * 01_httpServer1

from http.server import SimpleHTTPRequestHandler, HTTPServer

PORT = 7777

handler = SimpleHTTPRequestHandler
serv = HTTPServer(('127.0.0.1', PORT), handler) # HTTPServer 서버 객체 생성
print('HTTPServer 서비스 시작')
serv.serve_forever()                            # HTTPServer 서비스 시작

 * index.html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
<h2>파이썬</h2>
<a href="https://daum.net">DAUM 접속</a>
<br>
<a href="bbb.html">bbb 문서</a>
<hr>
바닥글. 작성자 : <b style="font-family : 궁서">홍길동</b>
</body>
</html>

* bbb.html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
단순웹 운영중<br>
<input type = "button" value = "이전으로" onclick="history.back()">
</body>
</html>


34. CGI

CGI(Common Gateway Interface)란 웹서버(정보제공측)와 클라이언트(정보이용측)간에 필요한 정보교환을 가능하게 해주는 일종의 웹인터페이스라고(일종의 프로그램) 할 수 있습니다.
PHP, ASP, ASP.NET, JSP...
CGI를 지원하는 CGIHTTPRequestHandler를 사용하면 클라이언트와 서버 사이에 자료를 주고 받을 수 있다. py 문서를 처리가능
get, post 요청처리 모두 지원

 

 * 01_httpServer2

from http.server import CGIHTTPRequestHandler, HTTPServer

PORT = 8888

class HandlerClass(CGIHTTPRequestHandler):
    cgi_directories = ['/cgi-bin']

serv = HTTPServer(('127.0.0.1', PORT), HandlerClass) # HTTPServer 서버 객체 생성

print('HTTPServer 서비스 시작')
serv.serve_forever()                            # HTTPServer 서비스 시작

 

 * main.html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
<h1>메인 페이지</h1>
<ul>
	<li><a href="https://www.naver.com">네이버</a></li>
	<li><a href="cgi-bin/hello.py">hello.py</a></li>
	<li><a href="cgi-bin/world.py">world.py</a></li>
	<li><a href="cgi-bin/my.py?name=tom&age=23">my.py - get방식으로 자료 전달</a></li>
	<li><a href="friend.html">friend</a></li>
	<li><a href="cgi-bin/sangpum.py">상품자료 확인</a></li>
</ul>
</body>
</html>

 * hello

# 웹 서비스가 가능한 파이썬 파일 작성
ss = '파이썬 변수'
kbs = 9
mbc = 10 + 1.0
print('Content-Type:text/html;charset=utf-8\n')
print('<html>')
print('<body>')
print('<h2>파이썬 문서로 정보 전달</h2>')
print('<b><i>hi</i></b>')
print('<br>파이썬 변수 출력 : %s, %d, %f'%(ss, kbs, mbc))
print('</body>')
print('</html>')

 

 * world

s1 = "자료1"
s2 = "자료2"
print('Content-Type:text/html;charset=utf-8\n')
print("""
<html>
<body>
<h2>html문서 좀더 쉽게 작성</h2>
자료 출력 : {0}, {1}
<br>
<img src='../images/pic.png' width='60%' />
<br>
<a href='../main.html'>메인</a>
</body>
</html>
""".format(s1, s2))

 

 * my

#client가 server로 자료 전달
import cgi

# 사용자(client)가 입력한 자료 받기 - get
form = cgi.FieldStorage()
name = form['name'].value
age = form['age'].value

print('Content-Type:text/html;charset=utf-8\n')
print("""
<html>
<body>
사용자가 입력한 자료 <br>
이름 : {0},
나이 : {1}
</body>
</html>
""".format(name, age))

 * friend.html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
친구 자료 입력<p/>
<form action="cgi-bin/friend.py" method="post">
이름 : <input type="text" name ="name" value="홍길동"/><br>
전화 : <input type="text" name ="phone"/><br>
성별 : 
<input type="radio" name="gen" value="남" checked="checked"/>남자
<input type="radio" name="gen" value="여"/>여자<br>
<br>
<input type="submit" value="전송">
</form>
</body>
</html>

 

 * friend.py

import cgi

# 사용자(client)가 입력한 자료 받기 - post
form = cgi.FieldStorage()
name = form['name'].value
phone = form['phone'].value
gen = form['gen'].value

# 값으로 DB저장, 연산에 참여 작업 진행필요.
# 단순 출력 진행.
print('Content-Type:text/html;charset=utf-8\n')
print("""
<html>
<body>
이름 : {}<br>
전화번호 : {}<br>
성별 : {}<br>
</body>
</html>
""".format(name, phone, gen))

 * sangpum

# Maria DB의 sangpum table 자료를 웹으로 출력

import MySQLdb
import ast

with open('cgi-bin/mariadb.txt', mode='r') as f:
    config = ast.literal_eval(f.read())
print('Content-Type:text/html;charset=utf-8\n')
print('<html><body><h2>상품자료 (Python 서버이용)</h2>')
print('<table border="1">')
print('<tr><th>코드</th><th>품명</th><th>수량</th><th>단가</th></tr>')
try:
    conn = MySQLdb.connect(**config)
    cursor = conn.cursor()
    cursor.execute("select * from sangdata")
    datas = cursor.fetchall()
    
    for code, sang, su, dan in datas:
        print("""<tr>
        <td>{0}</td>
        <td>{1}</td>
        <td>{2}</td>
        <td>{3}</td>
        </tr>""".format(code, sang, su, dan)) 
except Exception as e:
    print('error',e)
finally:
    cursor.close()
    conn.close()

print('</table></html></body>')

 


35. 챗봇(chatbot)

 

① 사이트 접속 후 파일 다운로드

www.lfd.uci.edu/~gohlke/pythonlibs/#jpype

 

Python Extension Packages for Windows - Christoph Gohlke

by Christoph Gohlke, Laboratory for Fluorescence Dynamics, University of California, Irvine. Updated on 14 February 2021 at 04:31 UTC. This page provides 32- and 64-bit Windows binaries of many scientific open-source extension packages for the official CPy

www.lfd.uci.edu

[JPype1-1.2.0-cp38-cp38-win_amd64.whl] Download

 

② anaconda prompt 접속 후 명령어 입력
cd C:\Users\wonh\Downloads
pip install JPype1-1.2.0-cp38-cp38-win_amd64.whl

pip install --upgrade pip
pip install konlpy

 

 

 * main.html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
<h1>메인 페이지</h1>
<ul>
	<li>인사관리</li>
	<li>영업관리</li>
	<li>게시판</li>
	<li><a href="cgi-bin/chatbot.py">챗봇</a></li>
</ul>
<pre>
본문
</pre>
<hr>
<footer>바닥글</footer>
</body>
</html>

 * httpServer

# 챗봇용 서버
from http.server import CGIHTTPRequestHandler, HTTPServer

PORT = 8080

class HandlerClass(CGIHTTPRequestHandler):
    cgi_directories = ['/cgi-bin']

serv = HTTPServer(('127.0.0.1', PORT), HandlerClass) # HTTPServer 서버 객체 생성

print('챗봇용 HTTPServer 서비스 시작')
serv.serve_forever()                            # HTTPServer 서비스 시작

 

 * chatbot

import cgi
from botengine import make_reply

# 입력 양식의 글자 추출하기 --- 
form = cgi.FieldStorage()

# 메인 처리 --- 
def go_main():
    m = form.getvalue("m", default="")
    if m == "" : 
        show_form()
    elif m == "say" : 
        api_say()

# 사용자의 입력에 응답하기 --- 
def api_say():
    print("Content-Type: text/plain; charset=utf-8")
    print("")
    txt = form.getvalue("txt", default="")
    if txt == "": 
        return
    res = make_reply(txt)
    print(res)

# 입력 양식 출력하기 --- 
def show_form():
    print("Content-Type: text/html; charset=utf-8")
    print("")
    print("""
    <html><meta charset="utf-8"><body>
    <script src="https://code.jquery.com/jquery-3.1.1.min.js"></script>
    <style>
        h1{ background-color: #ffe0e0; }
        div{ padding:10px; }
        span{ border-radius: 10px; background-color: #ffe0e0; padding:8px; }
        .bot{ text-align: left; }
        .usr{ text-align: right; }
    </style>
    <h1>* 대화하기 *</h1>
    <div id="chat"></div>
    <div class='usr'>
      <input id="txt" size="40">
      <button onclick="say()">전송</button>
    </div>
    <script>
    var url = "./chatbot.py";
    function say() {
      var txt = $('#txt').val();
      $.get(url, {"m":"say","txt":txt},
        function(res) {
          var html = "<div class='usr'><span>" + esc(txt) +
            "</span>: 나</div><div class='bot'> 봇:<span>" + esc(res) + "</span></div>";
          $('#chat').html($('#chat').html()+html);
          $('#txt').val('').focus();
        });
    }
    function esc(s) {
        return s.replace('&', '&amp;').replace('<','&lt;').replace('>', '&gt;');
    }
    </script></body></html>
    """)

go_main()

 

 * botengine

import codecs
from bs4 import BeautifulSoup
import urllib.request
from konlpy.tag import Okt
import os, re, json, random

dict_file = "chatbot-data.json"
dic = {}
twitter = Okt()

# 딕셔너리에 단어 등록하기 --- 
def register_dic(words):
    global dic
    if len(words) == 0: return
    tmp = ["@"]
    for i in words:
        word = i[0]
        if word == "" or word == "\r\n" or word == "\n": continue
        tmp.append(word)
        if len(tmp) < 3: continue
        if len(tmp) > 3: tmp = tmp[1:]
        set_word3(dic, tmp)
        if word == "." or word == "?":
            tmp = ["@"]
            continue
    # 딕셔너리가 변경될 때마다 저장하기
    json.dump(dic, open(dict_file,"w", encoding="utf-8"))

# 딕셔너리에 글 등록하기
def set_word3(dic, s3):
    w1, w2, w3 = s3
    if not w1 in dic: dic[w1] = {}
    if not w2 in dic[w1]: dic[w1][w2] = {}
    if not w3 in dic[w1][w2]: dic[w1][w2][w3] = 0
    dic[w1][w2][w3] += 1

# 문장 만들기 --- 
def make_sentence(head):
    if not head in dic: return ""
    ret = []
    if head != "@": 
        ret.append(head)        
    top = dic[head]
    w1 = word_choice(top)
    w2 = word_choice(top[w1])
    ret.append(w1)
    ret.append(w2)
    while True:
        if w1 in dic and w2 in dic[w1]:
            w3 = word_choice(dic[w1][w2])
        else:
            w3 = ""
        ret.append(w3)
        if w3 == "." or w3 == "? " or w3 == "": 
            break
        w1, w2 = w2, w3
        
    ret = "".join(ret)
    
    # 띄어쓰기
    params = urllib.parse.urlencode({
        "_callback": "",
        "q": ret
    })
    
    # 네이버의 맞춤법 검사기 api를 사용
    data = urllib.request.urlopen("https://m.search.naver.com/p/csearch/ocontent/spellchecker.nhn?" + params)
    data = data.read().decode("utf-8")[1:-2]
    data = json.loads(data)
    data = data["message"]["result"]["html"]
    data = soup = BeautifulSoup(data, "html.parser").getText()
    return data

def word_choice(sel):
    keys = sel.keys()
    return random.choice(list(keys))

# 챗봇 응답 만들기 --- 
def make_reply(text):
    # 단어 학습 시키기
    if not text[-1] in [".", "?"]: text += "."
    words = twitter.pos(text)
    register_dic(words)
    # 사전에 단어가 있다면 그것을 기반으로 문장 만들기
    for word in words:
        face = word[0]
        if face in dic: 
            return make_sentence(face)
    return make_sentence("@")

# 딕셔너리가 있다면 읽어 들이기
if os.path.exists(dict_file):
    dic = json.load(open(dict_file, "r"))


 

 

+ Recent posts

1